Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming update #1

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
afe4aec
Spark: Don't allow branch_ usage with VERSION AS OF (#9219)
nastra Dec 5, 2023
8519224
Flink: Document watermark generation feature (#9179)
pvary Dec 5, 2023
68d491e
Build: Bump datamodel-code-generator from 0.24.2 to 0.25.0 (#9189)
Fokko Dec 5, 2023
7b12a41
Flink: backport PR #9216 for disabling classloader check (#9226)
stevenzwu Dec 5, 2023
8b7a280
Build: Bump actions/setup-java from 3 to 4 (#9200)
dependabot[bot] Dec 5, 2023
d80d7da
Core: Handle IAE in default error handler (#9225)
nastra Dec 5, 2023
faa8b50
Core: Fix logic for determining set of committed files in BaseTransac…
amogh-jahagirdar Dec 5, 2023
8e1900d
Style: Replace Arrays.asList with Collections.singletonList (#9213)
yyy1000 Dec 5, 2023
367dc8b
Core: Add comment property to ViewProperties (#9181)
amogh-jahagirdar Dec 6, 2023
a89fc46
Spec: Clarify how column IDs are required (#9162)
emkornfield Dec 6, 2023
70ec4e5
Spark: Bump Spark minor versions for 3.3 and 3.4 (#9187)
ajantha-bhat Dec 6, 2023
d69ba05
Core: Introduce AssertViewUUID for REST catalog views (#8831)
nastra Dec 6, 2023
e276753
Core: Fix equality in StructLikeMap (#9236)
aokolnychyi Dec 6, 2023
6a9d3c7
Core: Add PartitionMap (#9194)
aokolnychyi Dec 7, 2023
af9522a
Docs: Document reading in Spark using branch and tag identifiers (#9238)
wypoon Dec 7, 2023
ea7665e
Nessie: Reimplement namespace operations (#8857)
adutra Dec 7, 2023
d929590
Docs: Update default format version to 2. (#9239)
zhongyujiang Dec 7, 2023
820fc3c
Flink: Move flink/v1.17 to flink/v1.18
rodmeneses Dec 4, 2023
b8ef64a
Recover flink/1.17 files from history
rodmeneses Dec 4, 2023
274390f
Remove Flink 1.15
rodmeneses Dec 4, 2023
22b95dc
Make Flink 1.18 to work
rodmeneses Dec 5, 2023
b79a8ff
Delta: Fix integration tests and Create DataFile by partition values …
HonahX Dec 7, 2023
263b530
Spark 3.5: Support Specifying spec_id in RewriteManifestProcedure (#9…
puchengy Dec 7, 2023
feeaa8c
Spark 3.5: Rework DeleteFileIndexBenchmark (#9165)
aokolnychyi Dec 8, 2023
3f5f4d9
Spark 3.2, 3.3, 3.4: Support specifying spec_id in RewriteManifestPro…
puchengy Dec 8, 2023
504c134
Spark 3.5: Fix testReplacePartitionField for rewriting manifests (#9250)
bknbkn Dec 8, 2023
62a23a3
Core: Fix null partitions in PartitionSet (#9248)
aokolnychyi Dec 8, 2023
beb41b6
Flink: switch to use SortKey for data statistics (#9212)
stevenzwu Dec 8, 2023
4d0b69b
Flink: Fix IcebergSource tableloader lifecycle management in batch mo…
mas-chen Dec 9, 2023
2152269
Flink: backport PR #9212 to 1.16 for switching to SortKey for data st…
stevenzwu Dec 8, 2023
2c31acc
Flink: backport PR #9212 to 1.18 for switching to SortKey for data st…
stevenzwu Dec 8, 2023
5e03d06
Build: Bump actions/setup-python from 4 to 5 (#9266)
dependabot[bot] Dec 10, 2023
1b80537
Build: Bump actions/labeler from 4 to 5 (#9264)
dependabot[bot] Dec 10, 2023
ec92fa3
Build: Bump actions/stale from 8.0.0 to 9.0.0 (#9265)
dependabot[bot] Dec 10, 2023
06894db
Build: Bump mkdocs-material from 9.4.12 to 9.5.1 (#9256)
dependabot[bot] Dec 10, 2023
d3deeec
Build: Bump net.snowflake:snowflake-jdbc from 3.14.3 to 3.14.4 (#9257)
dependabot[bot] Dec 10, 2023
ce9186f
Build: Bump software.amazon.awssdk:bom from 2.21.29 to 2.21.42 (#9259)
dependabot[bot] Dec 10, 2023
0331aba
Build: Bump com.google.cloud:libraries-bom from 26.27.0 to 26.28.0 (#…
dependabot[bot] Dec 10, 2023
7d06af3
Core: Improve view/table detection when replacing a table/view (#9012)
nastra Dec 10, 2023
4090a88
Core: Add REST catalog table session cache (#8920)
nastra Dec 10, 2023
61cf766
Build: Bump datamodel-code-generator from 0.24.2 to 0.25.1 (#9199)
dependabot[bot] Dec 11, 2023
1b95305
Revert "Build: Bump actions/labeler from 4 to 5 (#9264)" (#9271)
ajantha-bhat Dec 11, 2023
4920189
Open-API: Refactor updates with discriminator (#9240)
Fokko Dec 11, 2023
f21199d
MR: Migrate tests to JUnit5 (#9241)
lschetanrao Dec 11, 2023
934a3f9
MergingSnapshotProducer: Change file holder to be generic
jasonf20 Dec 11, 2023
01762b1
MergingSnapshotProducer: Support adding data files at a specific sequ…
jasonf20 Dec 11, 2023
5357e35
Table Operations: Added streaming update operation
jasonf20 Dec 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .github/workflows/api-binary-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
#
# See https://github.com/actions/checkout/issues/124
fetch-depth: 0
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 11
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/delta-conversion-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
Expand All @@ -74,7 +74,7 @@ jobs:
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
Expand All @@ -91,7 +91,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
Expand All @@ -103,7 +103,7 @@ jobs:
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=3.3 -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v3
if: failure()
with:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/flink-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
flink: ['1.15', '1.16', '1.17']
flink: ['1.16', '1.17', '1.18']
env:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/hive-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
Expand All @@ -86,7 +86,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 8
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/java-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
Expand All @@ -81,7 +81,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 8
Expand All @@ -91,7 +91,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 8
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/jmh-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
with:
repository: ${{ github.event.inputs.repo }}
ref: ${{ github.event.inputs.ref }}
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 11
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/open-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:

steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
- uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
with:
# we need to fetch all tags so that getProjectVersion() in build.gradle correctly determines the next SNAPSHOT version from the newest tag
fetch-depth: 0
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 8
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/recurring-jmh-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
with:
repository: ${{ github.event.inputs.repo }}
ref: ${{ github.event.inputs.ref }}
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 11
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/spark-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
Expand Down Expand Up @@ -93,7 +93,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
Expand Down Expand Up @@ -123,7 +123,7 @@ jobs:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v3
- uses: actions/setup-java@v4
with:
distribution: zulu
java-version: 17
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
if: github.repository_owner == 'apache'
runs-on: ubuntu-22.04
steps:
- uses: actions/stale@v8.0.0
- uses: actions/stale@v9.0.0
with:
stale-issue-label: 'stale'
exempt-issue-labels: 'not-stale'
Expand Down
99 changes: 99 additions & 0 deletions api/src/main/java/org/apache/iceberg/StreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.apache.iceberg.exceptions.ValidationException;

/**
* API for appending sequential updates to a table
*
* <p>This API accumulates batches of file additions and deletions by order, produces a new {@link
* Snapshot} of the changes where each batch is added to a new data sequence number, and commits
* that snapshot as the current.
*
* <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
* If any of the deleted files are no longer in the latest snapshot when reattempting, the commit
* will throw a {@link ValidationException}.
*/
public interface StreamingUpdate extends SnapshotUpdate<StreamingUpdate> {
/**
* Remove a data file from the current table state.
*
* <p>This rewrite operation may change the size or layout of the data files. When applicable, it
* is also recommended to discard already deleted records while rewriting data files. However, the
* set of live data records must never change.
*
* @param dataFile a rewritten data file
* @return this for method chaining
*/
default StreamingUpdate deleteFile(DataFile dataFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteFile");
}

/**
* Remove a delete file from the table state.
*
* <p>This rewrite operation may change the size or layout of the delete files. When applicable,
* it is also recommended to discard delete records for files that are no longer part of the table
* state. However, the set of applicable delete records must never change.
*
* @param deleteFile a rewritten delete file
* @return this for method chaining
*/
default StreamingUpdate deleteFile(DeleteFile deleteFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteFile");
}

/**
* Add a new data file to a specific. All files in this batch will receive the same data sequence
* number.
*
* <p>This rewrite operation may change the size or layout of the data files. When applicable, it
* is also recommended to discard already deleted records while rewriting data files. However, the
* set of live data records must never change.
*
* @param dataFile a new data file
* @param batchOrdinal The batch ordinal to associate with this data file
* @return this for method chaining
*/
default StreamingUpdate addFile(DataFile dataFile, int batchOrdinal) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Add a new delete file to a specific batch. All files in this batch will receive the same data
* sequence number.
*
* <p>This rewrite operation may change the size or layout of the delete files. When applicable,
* it is also recommended to discard delete records for files that are no longer part of the table
* state. However, the set of applicable delete records must never change.
*
* @param deleteFile a new delete file
* @param batchOrdinal The batch ordinal to associate with this data file
* @return this for method chaining
*/
default StreamingUpdate addFile(DeleteFile deleteFile, int batchOrdinal) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}
}
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table
* and commit.
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this
* table.
Expand Down
46 changes: 46 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,52 @@ public int hashCode() {
}
}

// similar to Row but has its own hashCode() and equals() implementations
// it is useful for testing custom collections that rely on wrappers
public static class CustomRow implements StructLike {
public static CustomRow of(Object... values) {
return new CustomRow(values);
}

private final Object[] values;

private CustomRow(Object... values) {
this.values = values;
}

@Override
public int size() {
return values.length;
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
return javaClass.cast(values[pos]);
}

@Override
public <T> void set(int pos, T value) {
values[pos] = value;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (other == null || getClass() != other.getClass()) {
return false;
}

CustomRow that = (CustomRow) other;
return Arrays.equals(values, that.values);
}

@Override
public int hashCode() {
return 17 * Arrays.hashCode(values);
}
}

public static class TestFieldSummary implements ManifestFile.PartitionFieldSummary {
private final boolean containsNull;
private final Boolean containsNaN;
Expand Down
Loading
Loading