Skip to content

Commit ccb8bd7

Browse files
committed
[Kernel][Writes] Allow transcation retries for blind append
1 parent 96f87bf commit ccb8bd7

File tree

6 files changed

+327
-6
lines changed

6 files changed

+327
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.exceptions;
17+
18+
import io.delta.kernel.annotation.Evolving;
19+
20+
/**
21+
* Thrown when the metadata of the Delta table has changed between the time of transaction start
22+
* and the time of commit.
23+
*
24+
* @since 3.2.0
25+
*/
26+
@Evolving
27+
public class MetadataChangedException extends ConcurrentWriteException {
28+
public MetadataChangedException() {
29+
super("Transaction has encountered a conflict and can not be committed. " +
30+
"Query needs to be re-executed using the latest version of the table.");
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.exceptions;
17+
18+
import io.delta.kernel.annotation.Evolving;
19+
20+
/**
21+
* Thrown when the protocol of the Delta table has changed between the time of transaction start
22+
* and the time of commit.
23+
*
24+
* @since 3.2.0
25+
*/
26+
@Evolving
27+
public class ProtocolChangedException extends ConcurrentWriteException {
28+
public ProtocolChangedException() {
29+
super("Transaction has encountered a conflict and can not be committed. " +
30+
"Query needs to be re-executed using the latest version of the table.");
31+
}
32+
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java

+8
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,14 @@ public static KernelException concurrentTransaction(
195195
return new ConcurrentTransactionException(appId, txnVersion, lastUpdated);
196196
}
197197

198+
public static KernelException metadataChangedException() {
199+
return new MetadataChangedException();
200+
}
201+
202+
public static KernelException protocolChangedException() {
203+
return new ProtocolChangedException();
204+
}
205+
198206
/* ------------------------ HELPER METHODS ----------------------------- */
199207
private static String formatTimestamp(long millisSinceEpochUTC) {
200208
return new Timestamp(millisSinceEpochUTC).toInstant().toString();

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,16 @@ public TransactionCommitResult commit(
146146
}
147147
}
148148

149+
public boolean isBlindAppend() {
150+
// For now, Kernel just supports blind append.
151+
// Change this when read-after-write is supported.
152+
return true;
153+
}
154+
155+
public Optional<SetTransaction> getSetTxnOpt() {
156+
return setTxnOpt;
157+
}
158+
149159
private Row generateCommitAction() {
150160
return new CommitInfo(
151161
System.currentTimeMillis(), /* timestamp */
@@ -162,12 +172,6 @@ private boolean isReadyForCheckpoint(long newVersion) {
162172
return newVersion > 0 && newVersion % checkpointInterval == 0;
163173
}
164174

165-
private boolean isBlindAppend() {
166-
// For now, Kernel just supports blind append.
167-
// Change this when read-after-write is supported.
168-
return true;
169-
}
170-
171175
private Map<String, String> getOperationParameters() {
172176
if (isNewTable) {
173177
List<String> partitionCols = VectorUtils.toJavaList(metadata.getPartitionColumns());

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java

+14
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ public class SingleAction {
3737
// Once we start supporting updating CDC or domain metadata enabled tables, we should add the
3838
// schema for those fields here.
3939

40+
/**
41+
* Schema to use when reading the winning commit files for conflict resolution. This schema
42+
* is just for resolving conflicts when doing a blind append. It doesn't cover case when the
43+
* txn is reading data from the table and updating the table.
44+
*/
45+
public static StructType CONFLICT_RESOLUTION_SCHEMA = new StructType()
46+
.add("txn", SetTransaction.FULL_SCHEMA)
47+
// .add("add", AddFile.FULL_SCHEMA) // not needed for blind appends
48+
// .add("remove", RemoveFile.FULL_SCHEMA) // not needed for blind appends
49+
.add("metaData", Metadata.FULL_SCHEMA)
50+
.add("protocol", Protocol.FULL_SCHEMA);
51+
// Once we start supporting domain metadata/row tracking enabled tables, we should add the
52+
// schema for domain metadata fields here.
53+
4054
// Schema to use when writing out the single action to the Delta Log.
4155
public static StructType FULL_SCHEMA = new StructType()
4256
.add("txn", SetTransaction.FULL_SCHEMA)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.replay;
17+
18+
import java.io.*;
19+
import java.util.*;
20+
import static java.lang.String.format;
21+
22+
import io.delta.kernel.data.ColumnVector;
23+
import io.delta.kernel.data.ColumnarBatch;
24+
import io.delta.kernel.engine.Engine;
25+
import io.delta.kernel.exceptions.ConcurrentWriteException;
26+
import io.delta.kernel.utils.CloseableIterator;
27+
import io.delta.kernel.utils.FileStatus;
28+
29+
import io.delta.kernel.internal.*;
30+
import io.delta.kernel.internal.actions.SetTransaction;
31+
import io.delta.kernel.internal.util.FileNames;
32+
import io.delta.kernel.internal.util.Preconditions;
33+
import static io.delta.kernel.internal.actions.SingleAction.CONFLICT_RESOLUTION_SCHEMA;
34+
import static io.delta.kernel.internal.util.FileNames.deltaFile;
35+
import static io.delta.kernel.internal.util.Preconditions.checkState;
36+
37+
/**
38+
* Class containing the conflict resolution logic when writing to a Delta table.
39+
* <p>
40+
* Currently, the support is to allow blind appends. Later on this can be extended to add support
41+
* for read-after-write scenarios.
42+
*/
43+
public class ConflictChecker {
44+
private static final int PROTOCOL_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("protocol");
45+
private static final int METADATA_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("metaData");
46+
private static final int TXN_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("txn");
47+
48+
// Snapshot of the table read by the transaction that encountered the conflict
49+
// (a.k.a the losing transaction)
50+
private final SnapshotImpl snapshot;
51+
52+
// Losing transaction
53+
private final TransactionImpl transaction;
54+
55+
private ConflictChecker(SnapshotImpl snapshot, TransactionImpl transaction) {
56+
this.snapshot = snapshot;
57+
this.transaction = transaction;
58+
}
59+
60+
/**
61+
* Resolve conflicts between the losing transaction and the winning transactions and return a
62+
* rebase state that the losing transaction needs to rebase against before attempting the
63+
* commit.
64+
*
65+
* @param engine {@link Engine} instance to use
66+
* @param snapshot {@link SnapshotImpl} of the table when the losing transaction has started
67+
* @param transaction {@link TransactionImpl} that encountered the conflict (a.k.a the losing
68+
* transaction)
69+
* @return {@link TransactionRebaseState} that the losing transaction needs to rebase against
70+
* @throws ConcurrentWriteException if there are logical conflicts between the losing
71+
* transaction and the winning transactions that cannot be
72+
* resolved.
73+
*/
74+
public static TransactionRebaseState resolveConflicts(
75+
Engine engine,
76+
SnapshotImpl snapshot,
77+
TransactionImpl transaction) throws ConcurrentWriteException {
78+
return new ConflictChecker(snapshot, transaction).resolveConflicts(engine);
79+
}
80+
81+
public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentWriteException {
82+
List<FileStatus> winningCommits = getWinningCommitFiles(engine);
83+
84+
// no winning commits. why did we get the transaction conflict?
85+
checkState(!winningCommits.isEmpty(), "No winning commits found.");
86+
87+
// Read the actions from the winning commits
88+
try (ActionsIterator actionsIterator = new ActionsIterator(
89+
engine,
90+
winningCommits,
91+
CONFLICT_RESOLUTION_SCHEMA,
92+
Optional.empty())) {
93+
94+
while (actionsIterator.hasNext()) {
95+
ActionWrapper action = actionsIterator.next();
96+
Preconditions.checkArgument(!action.isFromCheckpoint());
97+
ColumnarBatch batch = action.getColumnarBatch();
98+
99+
ColumnVector protocolVector = batch.getColumnVector(PROTOCOL_ORDINAL);
100+
ColumnVector metadataVector = batch.getColumnVector(METADATA_ORDINAL);
101+
ColumnVector txnVector = batch.getColumnVector(TXN_ORDINAL);
102+
103+
handleProtocol(protocolVector);
104+
handleMetadata(metadataVector);
105+
handleTxn(txnVector);
106+
}
107+
} catch (IOException ioe) {
108+
throw new UncheckedIOException("Error reading actions from winning commits.", ioe);
109+
}
110+
111+
// if we get here, we have successfully rebased against the winning transactions
112+
return new TransactionRebaseState(getLastWinningTxnVersion(winningCommits));
113+
}
114+
115+
/**
116+
* Class containing the rebase state from winning transactions that the current transaction
117+
* needs to rebase against before attempting the commit.
118+
* <p>
119+
* Currently, the rebase state is just the latest winning version of the table. In future once
120+
* we start supporting read-after-write, domain metadata, row tracking, etc., we will have more
121+
* state to add. For example read-after-write will need to know the files deleted in the winning
122+
* transactions to make sure the same files are not deleted by the current (losing)
123+
* transaction.
124+
*/
125+
public static class TransactionRebaseState {
126+
private final long latestVersion;
127+
128+
public TransactionRebaseState(long latestVersion) {
129+
this.latestVersion = latestVersion;
130+
}
131+
132+
/**
133+
* Return the latest winning version of the table.
134+
*
135+
* @return latest winning version of the table.
136+
*/
137+
public long getLatestVersion() {
138+
return latestVersion;
139+
}
140+
}
141+
142+
/**
143+
* Any protocol changes between the losing transaction and the winning transactions are not
144+
* allowed. In future once we start supporting more table features on the write side, this can
145+
* be changed to handle safe protocol changes. For now the write support in Kernel is supported
146+
* at a very first version of the protocol.
147+
*
148+
* @param protocolVector protocol rows from the winning transactions
149+
*/
150+
private void handleProtocol(ColumnVector protocolVector) {
151+
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
152+
if (!protocolVector.isNullAt(rowId)) {
153+
throw DeltaErrors.protocolChangedException();
154+
}
155+
}
156+
}
157+
158+
/**
159+
* Any metadata changes between the losing transaction and the winning transactions are not
160+
* allowed.
161+
*
162+
* @param metadataVector metadata rows from the winning transactions
163+
*/
164+
private void handleMetadata(ColumnVector metadataVector) {
165+
for (int rowId = 0; rowId < metadataVector.getSize(); rowId++) {
166+
if (!metadataVector.isNullAt(rowId)) {
167+
throw DeltaErrors.metadataChangedException();
168+
}
169+
}
170+
}
171+
172+
private void handleTxn(ColumnVector txnVector) {
173+
// Check if the losing transaction has any txn identifier. If it does, go through the
174+
// winning transactions and make sure that the losing transaction is valid from a
175+
// idempotent perspective.
176+
Optional<SetTransaction> losingTxnIdOpt = transaction.getSetTxnOpt();
177+
losingTxnIdOpt.ifPresent(losingTxnId -> {
178+
for (int rowId = 0; rowId < txnVector.getSize(); rowId++) {
179+
SetTransaction winningTxn = SetTransaction.fromColumnVector(txnVector, rowId);
180+
if (winningTxn != null && winningTxn.getVersion() >= losingTxnId.getVersion()) {
181+
throw DeltaErrors.concurrentTransaction(
182+
losingTxnId.getAppId(),
183+
losingTxnId.getVersion(),
184+
winningTxn.getVersion());
185+
}
186+
}
187+
});
188+
}
189+
190+
private List<FileStatus> getWinningCommitFiles(Engine engine) {
191+
String firstWinningCommitFile =
192+
deltaFile(snapshot.getLogPath(), snapshot.getVersion(engine) + 1);
193+
194+
try (CloseableIterator<FileStatus> files = engine.getFileSystemClient()
195+
.listFrom(firstWinningCommitFile)) {
196+
// Filter out all winning transaction commit files.
197+
List<FileStatus> winningCommitFiles = new ArrayList<>();
198+
while (files.hasNext()) {
199+
FileStatus file = files.next();
200+
if (FileNames.isCommitFile(file.getPath())) {
201+
winningCommitFiles.add(files.next());
202+
}
203+
}
204+
205+
return ensureNoGapsInWinningCommits(winningCommitFiles);
206+
} catch (FileNotFoundException nfe) {
207+
// no winning commits. why did we get here?
208+
throw new IllegalStateException("No winning commits found.", nfe);
209+
} catch (IOException ioe) {
210+
throw new UncheckedIOException(
211+
"Error listing files from " + firstWinningCommitFile, ioe);
212+
}
213+
}
214+
215+
private long getLastWinningTxnVersion(List<FileStatus> winningCommits) {
216+
FileStatus lastWinningTxn = winningCommits.get(winningCommits.size() - 1);
217+
return FileNames.deltaVersion(lastWinningTxn.getPath());
218+
}
219+
220+
private static List<FileStatus> ensureNoGapsInWinningCommits(List<FileStatus> winningCommits) {
221+
long lastVersion = -1;
222+
for (FileStatus commit : winningCommits) {
223+
long version = FileNames.deltaVersion(commit.getPath());
224+
checkState(lastVersion == -1 || version == lastVersion + 1,
225+
format("Gaps in Delta log commit files. Expected version %d but got %d",
226+
(lastVersion + 1), version));
227+
lastVersion = version;
228+
}
229+
return winningCommits;
230+
}
231+
}

0 commit comments

Comments
 (0)