diff --git a/api/src/main/java/org/apache/iceberg/StreamingUpdate.java b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java index 6edb4e2e71dd..65b8b07c1ba6 100644 --- a/api/src/main/java/org/apache/iceberg/StreamingUpdate.java +++ b/api/src/main/java/org/apache/iceberg/StreamingUpdate.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expression; /** * API for appending sequential updates to a table @@ -68,4 +69,61 @@ default StreamingUpdate addFile(DeleteFile deleteFile) { throw new UnsupportedOperationException( this.getClass().getName() + " does not implement addFile"); } + + /** + * Set the snapshot ID used in any reads for this operation. + * + *
Validations will check changes after this snapshot ID. If the from snapshot is not set, all + * ancestor snapshots through the table's initial snapshot are validated. + * + * @param snapshotId a snapshot ID + * @return this for method chaining + */ + StreamingUpdate validateFromSnapshot(long snapshotId); + + /** + * Sets a conflict detection filter used to validate concurrently added data and delete files. + * + *
If not called, a true literal will be used as the conflict detection filter. + * + * @param conflictDetectionFilter an expression on rows in the table + * @return this for method chaining + */ + StreamingUpdate conflictDetectionFilter(Expression conflictDetectionFilter); + + /** + * Enables validation that data files added concurrently do not conflict with this commit's + * operation. + * + *
This method should be called when the table is queried to determine which files to + * delete/append. If a concurrent operation commits a new file after the data was read and that + * file might contain rows matching the specified conflict detection filter, this operation will + * detect this during retries and fail. + * + *
Calling this method is required to maintain serializable isolation for update/delete + * operations. Otherwise, the isolation level will be snapshot isolation. + * + *
Validation uses the conflict detection filter passed to {@link + * #conflictDetectionFilter(Expression)} and applies to operations that happened after the + * snapshot passed to {@link #validateFromSnapshot(long)}. + * + * @return this for method chaining + */ + StreamingUpdate validateNoConflictingDataFiles(); + + /** + * Enables validation that delete files added concurrently do not conflict with this commit's + * operation. + * + *
This method must be called when the table is queried to produce a row delta for UPDATE and + * MERGE operations independently of the isolation level. Calling this method isn't required for + * DELETE operations as it is OK to delete a record that is also deleted concurrently. + * + *
Validation uses the conflict detection filter passed to {@link
+ * #conflictDetectionFilter(Expression)} and applies to operations that happened after the
+ * snapshot passed to {@link #validateFromSnapshot(long)}.
+ *
+ * @return this for method chaining
+ */
+ StreamingUpdate validateNoConflictingDeleteFiles();
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
index 2b86e56e7367..2e9bf4dfe970 100644
--- a/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/BaseStreamingUpdate.java
@@ -20,13 +20,22 @@
import java.util.List;
import java.util.Set;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.SnapshotUtil;
class BaseStreamingUpdate extends MergingSnapshotProducer