Skip to content

Commit 96f87bf

Browse files
authored
[Kernel][Writes] Support idempotent writes (#3051)
## Description (Split from #2944) Adds an API on `TransactionBuilder` to take the transaction identifier for idempotent writes ``` /* * Set the transaction identifier for idempotent writes. Incremental processing systems (e.g., * streaming systems) that track progress using their own application-specific versions need to * record what progress has been made, in order to avoid duplicating data in the face of * failures and retries during writes. By setting the transaction identifier, the Delta table * can ensure that the data with same identifier is not written multiple times. For more * information refer to the Delta protocol section <a * href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers"> * Transaction Identifiers</a>. * * @param engine {@link Engine} instance to use. * @param applicationId The application ID that is writing to the table. * @param transactionVersion The version of the transaction. This should be monotonically * increasing with each write for the same application ID. * @return updated {@link TransactionBuilder} instance. */ TransactionBuilder withTransactionId( Engine engine, String applicationId, long transactionVersion); ``` During the transaction build, check the latest txn version of the given AppId. If it is not monotonically increasing throw `ConcurrentTransactionException`. ## How was this patch tested? Added to `DeltaTableWriteSuite.scala`
1 parent 91cd61a commit 96f87bf

File tree

7 files changed

+186
-6
lines changed

7 files changed

+186
-6
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/TransactionBuilder.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.delta.kernel.annotation.Evolving;
2121
import io.delta.kernel.engine.Engine;
22+
import io.delta.kernel.exceptions.ConcurrentTransactionException;
2223
import io.delta.kernel.types.StructType;
2324

2425
/**
@@ -48,10 +49,33 @@ public interface TransactionBuilder {
4849
TransactionBuilder withPartitionColumns(Engine engine, List<String> partitionColumns);
4950

5051
/**
51-
* Build the transaction. Also validates the given info to ensure that a valida transaction
52-
* can be created.
52+
* Set the transaction identifier for idempotent writes. Incremental processing systems (e.g.,
53+
* streaming systems) that track progress using their own application-specific versions need to
54+
* record what progress has been made, in order to avoid duplicating data in the face of
55+
* failures and retries during writes. By setting the transaction identifier, the Delta table
56+
* can ensure that the data with same identifier is not written multiple times. For more
57+
* information refer to the Delta protocol section <a
58+
* href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers">
59+
* Transaction Identifiers</a>.
60+
*
61+
* @param engine {@link Engine} instance to use.
62+
* @param applicationId The application ID that is writing to the table.
63+
* @param transactionVersion The version of the transaction. This should be monotonically
64+
* increasing with each write for the same application ID.
65+
* @return updated {@link TransactionBuilder} instance.
66+
*/
67+
TransactionBuilder withTransactionId(
68+
Engine engine,
69+
String applicationId,
70+
long transactionVersion);
71+
72+
/**
73+
* Build the transaction. Also validates the given info to ensure that a valid transaction can
74+
* be created.
5375
*
5476
* @param engine {@link Engine} instance to use.
77+
* @throws ConcurrentTransactionException if the table already has a committed transaction with
78+
* the same given transaction identifier.
5579
*/
5680
Transaction build(Engine engine);
5781
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.exceptions;
17+
18+
import io.delta.kernel.TransactionBuilder;
19+
import io.delta.kernel.annotation.Evolving;
20+
import io.delta.kernel.engine.Engine;
21+
22+
/**
23+
* Thrown when concurrent transaction both attempt to update the table with same transaction
24+
* identifier set through {@link TransactionBuilder#withTransactionId(Engine, String, long)}
25+
* (String)}.
26+
* <p>
27+
* Incremental processing systems (e.g., streaming systems) that track progress using their own
28+
* application-specific versions need to record what progress has been made, in order to avoid
29+
* duplicating data in the face of failures and retries during writes. For more information refer to
30+
* the Delta protocol section <a
31+
* href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers">
32+
* Transaction Identifiers</a>
33+
*
34+
* @since 3.2.0
35+
*/
36+
@Evolving
37+
public class ConcurrentTransactionException extends ConcurrentWriteException {
38+
private static final String message = "This error occurs when multiple updates are " +
39+
"using the same transaction identifier to write into this table.\n" +
40+
"Application ID: %s, Attempted version: %s, Latest version in table: %s";
41+
42+
public ConcurrentTransactionException(String appId, long txnVersion, long lastUpdated) {
43+
super(String.format(message, appId, txnVersion, lastUpdated));
44+
}
45+
}

kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/ConcurrentWriteException.java

+4
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,8 @@ public ConcurrentWriteException() {
2828
super("Transaction has encountered a conflict and can not be committed. " +
2929
"Query needs to be re-executed using the latest version of the table.");
3030
}
31+
32+
public ConcurrentWriteException(String message) {
33+
super(message);
34+
}
3135
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java

+7
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,13 @@ public static KernelException partitionColumnMissingInData(
188188
return new KernelException(format(msgT, partitionColumn, tablePath));
189189
}
190190

191+
public static KernelException concurrentTransaction(
192+
String appId,
193+
long txnVersion,
194+
long lastUpdated) {
195+
return new ConcurrentTransactionException(appId, txnVersion, lastUpdated);
196+
}
197+
191198
/* ------------------------ HELPER METHODS ----------------------------- */
192199
private static String formatTimestamp(long millisSinceEpochUTC) {
193200
return new Timestamp(millisSinceEpochUTC).toInstant().toString();

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.delta.kernel.internal;
1717

1818
import java.util.*;
19+
import static java.util.Objects.requireNonNull;
1920

2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
@@ -49,6 +50,7 @@ public class TransactionBuilderImpl implements TransactionBuilder {
4950
private final Operation operation;
5051
private Optional<StructType> schema = Optional.empty();
5152
private Optional<List<String>> partitionColumns = Optional.empty();
53+
private Optional<SetTransaction> setTxnOpt = Optional.empty();
5254

5355
public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation operation) {
5456
this.table = table;
@@ -70,6 +72,19 @@ public TransactionBuilder withPartitionColumns(Engine engine, List<String> parti
7072
return this;
7173
}
7274

75+
@Override
76+
public TransactionBuilder withTransactionId(
77+
Engine engine,
78+
String applicationId,
79+
long transactionVersion) {
80+
SetTransaction txnId = new SetTransaction(
81+
requireNonNull(applicationId, "applicationId is null"),
82+
transactionVersion,
83+
Optional.of(currentTimeMillis));
84+
this.setTxnOpt = Optional.of(txnId);
85+
return this;
86+
}
87+
7388
@Override
7489
public Transaction build(Engine engine) {
7590
SnapshotImpl snapshot;
@@ -97,7 +112,8 @@ public Transaction build(Engine engine) {
97112
engineInfo,
98113
operation,
99114
snapshot.getProtocol(),
100-
snapshot.getMetadata());
115+
snapshot.getMetadata(),
116+
setTxnOpt);
101117
}
102118

103119
/**
@@ -131,6 +147,16 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
131147
SchemaUtils.validatePartitionColumns(
132148
schema.get(), partitionColumns.orElse(Collections.emptyList()));
133149
}
150+
151+
setTxnOpt.ifPresent(txnId -> {
152+
Optional<Long> lastTxnVersion = snapshot.getLatestTransactionVersion(txnId.getAppId());
153+
if (lastTxnVersion.isPresent() && lastTxnVersion.get() >= txnId.getVersion()) {
154+
throw DeltaErrors.concurrentTransaction(
155+
txnId.getAppId(),
156+
txnId.getVersion(),
157+
lastTxnVersion.get());
158+
}
159+
});
134160
}
135161

136162
private class InitialSnapshot extends SnapshotImpl {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class TransactionImpl
5454
private final Protocol protocol;
5555
private final Metadata metadata;
5656
private final SnapshotImpl readSnapshot;
57+
private final Optional<SetTransaction> setTxnOpt;
5758

5859
private boolean closed; // To avoid trying to commit the same transaction again.
5960

@@ -65,7 +66,8 @@ public TransactionImpl(
6566
String engineInfo,
6667
Operation operation,
6768
Protocol protocol,
68-
Metadata metadata) {
69+
Metadata metadata,
70+
Optional<SetTransaction> setTxnOpt) {
6971
this.isNewTable = isNewTable;
7072
this.dataPath = dataPath;
7173
this.logPath = logPath;
@@ -74,6 +76,7 @@ public TransactionImpl(
7476
this.operation = operation;
7577
this.protocol = protocol;
7678
this.metadata = metadata;
79+
this.setTxnOpt = setTxnOpt;
7780
}
7881

7982
@Override
@@ -106,6 +109,7 @@ public TransactionCommitResult commit(
106109
metadataActions.add(createMetadataSingleAction(metadata.toRow()));
107110
metadataActions.add(createProtocolSingleAction(protocol.toRow()));
108111
}
112+
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow())));
109113

110114
try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) {
111115
// Create a new CloseableIterator that will return the metadata actions followed by the

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala

+72-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch
2424
import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase
2525
import io.delta.kernel.defaults.utils.TestRow
2626
import io.delta.kernel.engine.Engine
27-
import io.delta.kernel.exceptions.{KernelException, TableAlreadyExistsException, TableNotFoundException}
27+
import io.delta.kernel.exceptions._
2828
import io.delta.kernel.expressions.Literal
2929
import io.delta.kernel.expressions.Literal._
3030
import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement
@@ -430,7 +430,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
430430
val expV0Data = v0Part0Data.flatMap(_.toTestRows) ++ v0Part1Data.flatMap(_.toTestRows)
431431
val expV1Data = v1Part0Data.flatMap(_.toTestRows) ++ v1Part1Data.flatMap(_.toTestRows)
432432

433-
for(i <- 0 until 2) {
433+
for (i <- 0 until 2) {
434434
val commitResult = appendData(
435435
engine,
436436
tblPath,
@@ -673,6 +673,76 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
673673
}
674674
}
675675

676+
test("insert into table - idempotent writes") {
677+
withTempDirAndEngine { (tblPath, engine) =>
678+
val data = Seq(Map("part1" -> ofInt(1), "part2" -> ofInt(2)) -> dataPartitionBatches1)
679+
var expData = Seq.empty[TestRow] // as the data in inserted, update this.
680+
681+
def addDataWithTxnId(newTbl: Boolean, appId: String, txnVer: Long, expTblVer: Long): Unit = {
682+
var txnBuilder = createWriteTxnBuilder(Table.forPath(engine, tblPath))
683+
684+
if (appId != null) txnBuilder = txnBuilder.withTransactionId(engine, appId, txnVer)
685+
686+
if (newTbl) {
687+
txnBuilder = txnBuilder.withSchema(engine, testPartitionSchema)
688+
.withPartitionColumns(engine, testPartitionColumns.asJava)
689+
}
690+
val txn = txnBuilder.build(engine)
691+
692+
val combinedActions = inMemoryIterable(
693+
data.map { case (partValues, partData) =>
694+
stageData(txn.getTransactionState(engine), partValues, partData)
695+
}.reduceLeft(_ combine _))
696+
697+
val commitResult = txn.commit(engine, combinedActions)
698+
699+
expData = expData ++ data.flatMap(_._2).flatMap(_.toTestRows)
700+
701+
verifyCommitResult(commitResult, expVersion = expTblVer, expIsReadyForCheckpoint = false)
702+
val expPartCols = if (newTbl) testPartitionColumns else null
703+
verifyCommitInfo(tblPath, version = expTblVer, expPartCols, operation = WRITE)
704+
verifyWrittenContent(tblPath, testPartitionSchema, expData)
705+
}
706+
707+
def expFailure(appId: String, txnVer: Long, latestTxnVer: Long)(fn: => Any): Unit = {
708+
val ex = intercept[ConcurrentTransactionException] {
709+
fn
710+
}
711+
assert(ex.getMessage.contains(s"This error occurs when multiple updates are using the " +
712+
s"same transaction identifier to write into this table.\nApplication ID: $appId, " +
713+
s"Attempted version: $txnVer, Latest version in table: $latestTxnVer"))
714+
}
715+
716+
// Create a transaction with id (txnAppId1, 0) and commit it
717+
addDataWithTxnId(newTbl = true, appId = "txnAppId1", txnVer = 0, expTblVer = 0)
718+
719+
// Try to create a transaction with id (txnAppId1, 0) and commit it - should be valid
720+
addDataWithTxnId(newTbl = false, appId = "txnAppId1", txnVer = 1, expTblVer = 1)
721+
722+
// Try to create a transaction with id (txnAppId1, 1) and try to commit it
723+
// Should fail the it is already committed above.
724+
expFailure("txnAppId1", txnVer = 1, latestTxnVer = 1) {
725+
addDataWithTxnId(newTbl = false, "txnAppId1", txnVer = 1, expTblVer = 2)
726+
}
727+
728+
// append with no txn id
729+
addDataWithTxnId(newTbl = false, appId = null, txnVer = 0, expTblVer = 2)
730+
731+
// Try to create a transaction with id (txnAppId2, 1) and commit it
732+
// Should be successful as the transaction app id is different
733+
addDataWithTxnId(newTbl = false, "txnAppId2", txnVer = 1, expTblVer = 3)
734+
735+
// Try to create a transaction with id (txnAppId2, 0) and commit it
736+
// Should fail as the transaction app id is same but the version is less than the committed
737+
expFailure("txnAppId2", txnVer = 0, latestTxnVer = 1) {
738+
addDataWithTxnId(newTbl = false, "txnAppId2", txnVer = 0, expTblVer = 4)
739+
}
740+
741+
// TODO: Add a test case where there are concurrent transactions with same app id
742+
// and only one of them succeeds. Will be added once conflict resolution is handled
743+
}
744+
}
745+
676746
def verifyWrittenContent(path: String, expSchema: StructType, expData: Seq[TestRow]): Unit = {
677747
val actSchema = tableSchema(path)
678748
assert(actSchema === expSchema)

0 commit comments

Comments
 (0)