Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming update #2

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b309d9b
Nessie: Support views for NessieCatalog (#8909)
ajantha-bhat Dec 12, 2023
0c5b87a
Data: Add GenericFileWriterFactory (#9267)
aokolnychyi Dec 12, 2023
09b44bb
Hive: Introduce HiveMetastoreExtension for Hive tests (#9282)
nk1506 Dec 12, 2023
36ecab4
Core: Add StandardEncryptionManager (#9277)
rdblue Dec 12, 2023
d631e2c
Core, Spark: Avoid manifest copies when importing data to V2 tables (…
aokolnychyi Dec 13, 2023
3112ec9
Build: Bump org.apache.httpcomponents.client5:httpclient5 (#9260)
dependabot[bot] Dec 13, 2023
60876e4
Hive: Make HiveMetastoreExtension configurable (#9288)
nastra Dec 13, 2023
7240752
Docs: Add spec-id for rewrite manifests (#9253)
puchengy Dec 13, 2023
11608e1
JDBC Catalog: Fix namespaceExists check with special characters (#8340)
ismailsimsek Dec 13, 2023
46df2ce
API: Restore RuntimeIOException for use (#5640)
danielcweeks Dec 14, 2023
c6bbbdb
Spark: Remove support for Spark 3.2 (#9295)
ajantha-bhat Dec 14, 2023
5e62e47
Spark: Fix flaky tests which concurrently modify HashSet (#9294)
manuzhang Dec 14, 2023
7a42120
Docs: Update readme status paragraph (#9272)
mt-ronkorving Dec 14, 2023
09290c5
Core: Remove deprecated classes related to rewrite data files (#9296)
ajantha-bhat Dec 14, 2023
5487c17
Hive: Refactor TestHiveCatalog tests to use CatalogTests (#8918)
nk1506 Dec 14, 2023
8572c56
API, Core: Move SQLViewRepresentation to API (#9302)
nastra Dec 14, 2023
d56dd63
Doc: Adding documentation for flink iceberg connector for version 1.1…
rodmeneses Dec 14, 2023
8181f84
API, Core: Add sqlFor API to views to handle resolving a representati…
amogh-jahagirdar Dec 15, 2023
d6a4ca7
API: Fix equals and hashCode in CharSequenceSet (#9245)
aokolnychyi Dec 16, 2023
395e01e
Core: Make sqlFor case insensitive for dialect check (#9311)
amogh-jahagirdar Dec 16, 2023
24578a2
Core: Fix metadata table uuid to return a consistent UUID for the sam…
ajantha-bhat Dec 16, 2023
901f124
MergingSnapshotProducer: Change file holder to be generic
jasonf20 Dec 11, 2023
5f8d40e
MergingSnapshotProducer: Support adding data files at a specific sequ…
jasonf20 Dec 11, 2023
3e67c95
Table Operations: Added streaming update operation
jasonf20 Dec 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .github/workflows/publish-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ jobs:
- run: |
./gradlew printVersion
./gradlew -DallVersions publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
./gradlew -DflinkVersions= -DsparkVersions=3.2,3.3,3.4,3.5 -DscalaVersion=2.13 -DhiveVersions= publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
./gradlew -DflinkVersions= -DsparkVersions=3.3,3.4,3.5 -DscalaVersion=2.13 -DhiveVersions= publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
4 changes: 2 additions & 2 deletions .github/workflows/spark-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
spark: ['3.2', '3.3', '3.4', '3.5']
spark: ['3.3', '3.4', '3.5']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down Expand Up @@ -88,7 +88,7 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
spark: ['3.2','3.3','3.4','3.5']
spark: ['3.3','3.4','3.5']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ lib/
site/site

# benchmark output
spark/v3.2/spark/benchmark/*
spark/v3.3/spark/benchmark/*
spark/v3.3/spark-extensions/benchmark/*
spark/v3.4/spark/benchmark/*
Expand Down
9 changes: 9 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,15 @@ acceptedBreaks:
justification: "Static utility class - should not have public constructor"
"1.4.0":
org.apache.iceberg:iceberg-core:
- code: "java.class.removed"
old: "class org.apache.iceberg.actions.BinPackStrategy"
justification: "Removing deprecated code"
- code: "java.class.removed"
old: "class org.apache.iceberg.actions.SortStrategy"
justification: "Removing deprecated code"
- code: "java.class.removed"
old: "interface org.apache.iceberg.actions.RewriteStrategy"
justification: "Removing deprecated code"
- code: "java.field.serialVersionUIDChanged"
new: "field org.apache.iceberg.util.SerializableMap<K, V>.serialVersionUID"
justification: "Serialization is not be used"
Expand Down
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ Background and documentation is available at <https://iceberg.apache.org>

Iceberg is under active development at the Apache Software Foundation.

The core Java library that tracks table snapshots and metadata is complete, but still evolving. Current work is focused on adding row-level deletes and upserts, and integration work with new engines like Flink and Hive.
The [Iceberg format specification][iceberg-spec] is stable and new features are added with each version.

The [Iceberg format specification][iceberg-spec] is being actively updated and is open for comment. Until the specification is complete and released, it carries no compatibility guarantees. The spec is currently evolving as the Java reference implementation changes.
The core Java library is located in this repository and is the reference implementation for other libraries.

[Java API javadocs][iceberg-javadocs] are available for the main.
[Documentation][iceberg-docs] is available for all libraries and integrations.

[iceberg-javadocs]: https://iceberg.apache.org/javadoc/latest
[iceberg-spec]: https://iceberg.apache.org/spec
Current work is tracked in the [roadmap][roadmap].

[iceberg-docs]: https://iceberg.apache.org/docs/latest/
[iceberg-spec]: https://iceberg.apache.org/spec
[roadmap]: https://iceberg.apache.org/roadmap/

## Collaboration

Expand Down
18 changes: 9 additions & 9 deletions api/src/main/java/org/apache/iceberg/AppendFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ public interface AppendFiles extends SnapshotUpdate<AppendFiles> {
* <p>The manifest must contain only appended files. All files in the manifest will be appended to
* the table in the snapshot created by this update.
*
* <p>By default, the manifest will be rewritten to assign all entries this update's snapshot ID.
* In that case, it is always the responsibility of the caller to manage the lifecycle of the
* original manifest.
* <p>The manifest will be used directly if snapshot ID inheritance is enabled (all tables with
* the format version &gt; 1 or if the inheritance is enabled explicitly via table properties).
* Otherwise, the manifest will be rewritten to assign all entries this update's snapshot ID.
*
* <p>If manifest entries are allowed to inherit the snapshot ID assigned on commit, the manifest
* should never be deleted manually if the commit succeeds as it will become part of the table
* metadata and will be cleaned up on expiry. If the manifest gets merged with others while
* preparing a new snapshot, it will be deleted automatically if this operation is successful. If
* the commit fails, the manifest will never be deleted and it is up to the caller whether to
* delete or reuse it.
* <p>If the manifest is rewritten, it is always the responsibility of the caller to manage the
* lifecycle of the original manifest. If the manifest is used directly, it should never be
* deleted manually if the commit succeeds as it will become part of the table metadata and will
* be cleaned upon expiry. If the manifest gets merged with others while preparing a new snapshot,
* it will be deleted automatically if this operation is successful. If the commit fails, the
* manifest will never be deleted, and it is up to the caller whether to delete or reuse it.
*
* @param file a manifest file
* @return this for method chaining
Expand Down
129 changes: 129 additions & 0 deletions api/src/main/java/org/apache/iceberg/StreamingUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;

/**
* API for appending sequential updates to a table
*
* <p>This API accumulates batches of file additions and deletions by order, produces a new {@link
* Snapshot} of the changes where each batch is added to a new data sequence number, and commits
* that snapshot as the current.
*
* <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts
* will be resolved by applying the changes to the new latest snapshot and reattempting the commit.
* If any of the deleted files are no longer in the latest snapshot when reattempting, the commit
* will throw a {@link ValidationException}.
*/
public interface StreamingUpdate extends SnapshotUpdate<StreamingUpdate> {

/**
* Start a new batch of changes. The changes in this batch will have a sequence number larger than
* the changes in the previous batches.
*
* @return this for method chaining
*/
default StreamingUpdate newBatch() {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement newBatch");
}

/**
* Add a new data file to the current batch. All files in this batch will receive the same data
* sequence number.
*
* @param dataFile a new data file
* @return this for method chaining
*/
default StreamingUpdate addFile(DataFile dataFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Add a new delete file to the current batch. All files in this batch will receive the same data
* sequence number.
*
* @param deleteFile a new delete file
* @return this for method chaining
*/
default StreamingUpdate addFile(DeleteFile deleteFile) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement addFile");
}

/**
* Set the snapshot ID used in any reads for this operation.
*
* <p>Validations will check changes after this snapshot ID. If the from snapshot is not set, all
* ancestor snapshots through the table's initial snapshot are validated.
*
* @param snapshotId a snapshot ID
* @return this for method chaining
*/
StreamingUpdate validateFromSnapshot(long snapshotId);

/**
* Sets a conflict detection filter used to validate concurrently added data and delete files.
*
* <p>If not called, a true literal will be used as the conflict detection filter.
*
* @param conflictDetectionFilter an expression on rows in the table
* @return this for method chaining
*/
StreamingUpdate conflictDetectionFilter(Expression conflictDetectionFilter);

/**
* Enables validation that data files added concurrently do not conflict with this commit's
* operation.
*
* <p>This method should be called when the table is queried to determine which files to
* delete/append. If a concurrent operation commits a new file after the data was read and that
* file might contain rows matching the specified conflict detection filter, this operation will
* detect this during retries and fail.
*
* <p>Calling this method is required to maintain serializable isolation for update/delete
* operations. Otherwise, the isolation level will be snapshot isolation.
*
* <p>Validation uses the conflict detection filter passed to {@link
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the
* snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
StreamingUpdate validateNoConflictingDataFiles();

/**
* Enables validation that delete files added concurrently do not conflict with this commit's
* operation.
*
* <p>This method must be called when the table is queried to produce a row delta for UPDATE and
* MERGE operations independently of the isolation level. Calling this method isn't required for
* DELETE operations as it is OK to delete a record that is also deleted concurrently.
*
* <p>Validation uses the conflict detection filter passed to {@link
* #conflictDetectionFilter(Expression)} and applies to operations that happened after the
* snapshot passed to {@link #validateFromSnapshot(long)}.
*
* @return this for method chaining
*/
StreamingUpdate validateNoConflictingDeleteFiles();
}
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this table
* and commit.
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ default AppendFiles newFastAppend() {
*/
RewriteFiles newRewrite();

/**
* Create a new {@link StreamingUpdate streaming update API} to append sequential upserts to the
* table.
*
* @return a new {@link StreamingUpdate}
*/
default StreamingUpdate newStreamingUpdate() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement newStreamingUpdate()");
}

/**
* Create a new {@link RewriteManifests rewrite manifests API} to replace manifests for this
* table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public interface EncryptedOutputFile {
* #encryptingOutputFile()}.
*/
EncryptionKeyMetadata keyMetadata();

/** Underlying output file for native encryption. */
default OutputFile plainOutputFile() {
throw new UnsupportedOperationException("Not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;

/**
* @deprecated Use java.io.UncheckedIOException directly instead.
* <p>Exception used to wrap {@link IOException} as a {@link RuntimeException} and add context.
*/
@Deprecated
/** Exception used to wrap {@link IOException} as a {@link RuntimeException} and add context. */
public class RuntimeIOException extends UncheckedIOException {

public RuntimeIOException(IOException cause) {
Expand Down
20 changes: 13 additions & 7 deletions api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -168,22 +167,29 @@ public void clear() {
}

@Override
public boolean equals(Object o) {
if (this == o) {
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (!(other instanceof Set)) {
return false;
}

if (o == null || getClass() != o.getClass()) {
Set<?> that = (Set<?>) other;

if (size() != that.size()) {
return false;
}

CharSequenceSet that = (CharSequenceSet) o;
return wrapperSet.equals(that.wrapperSet);
try {
return containsAll(that);
} catch (ClassCastException | NullPointerException unused) {
return false;
}
}

@Override
public int hashCode() {
return Objects.hashCode(wrapperSet);
return wrapperSet.stream().mapToInt(CharSequenceWrapper::hashCode).sum();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
*/
package org.apache.iceberg.view;

import org.immutables.value.Value;

@Value.Immutable
/** SQLViewRepresentation represents views in SQL with a given dialect */
public interface SQLViewRepresentation extends ViewRepresentation {

@Override
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/view/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,15 @@ default UpdateLocation updateLocation() {
default UUID uuid() {
throw new UnsupportedOperationException("Retrieving a view's uuid is not supported");
}

/**
* Returns the view representation for the given SQL dialect
*
* @return the view representation for the given SQL dialect, or null if no representation could
* be resolved
*/
default SQLViewRepresentation sqlFor(String dialect) {
throw new UnsupportedOperationException(
"Resolving a sql with a given dialect is not supported");
}
}
Loading
Loading