20
20
import java .util .*;
21
21
import java .util .stream .Collectors ;
22
22
23
+ import org .slf4j .Logger ;
24
+ import org .slf4j .LoggerFactory ;
25
+
23
26
import io .delta .kernel .*;
24
27
import io .delta .kernel .data .Row ;
25
28
import io .delta .kernel .engine .Engine ;
32
35
import io .delta .kernel .internal .actions .*;
33
36
import io .delta .kernel .internal .data .TransactionStateRow ;
34
37
import io .delta .kernel .internal .fs .Path ;
38
+ import io .delta .kernel .internal .replay .ConflictChecker ;
39
+ import io .delta .kernel .internal .replay .ConflictChecker .TransactionRebaseState ;
35
40
import io .delta .kernel .internal .util .FileNames ;
36
41
import io .delta .kernel .internal .util .VectorUtils ;
37
42
import static io .delta .kernel .internal .TableConfig .CHECKPOINT_INTERVAL ;
38
43
import static io .delta .kernel .internal .actions .SingleAction .*;
44
+ import static io .delta .kernel .internal .util .Preconditions .checkArgument ;
39
45
import static io .delta .kernel .internal .util .Preconditions .checkState ;
40
46
import static io .delta .kernel .internal .util .Utils .toCloseableIterator ;
41
47
42
48
public class TransactionImpl
43
49
implements Transaction {
50
+ private static final Logger logger = LoggerFactory .getLogger (TransactionImpl .class );
51
+
44
52
public static final int DEFAULT_READ_VERSION = 1 ;
45
53
public static final int DEFAULT_WRITE_VERSION = 2 ;
46
54
55
+ /**
56
+ * Number of retries for concurrent write exceptions to resolve conflicts and retry commit. In
57
+ * Delta-Spark, for historical reasons the number of retries is really high (10m). We are
58
+ * starting with a lower number for now. If this is not sufficient we can update it.
59
+ */
60
+ private static final int NUM_TXN_RETRIES = 200 ;
61
+
47
62
private final UUID txnId = UUID .randomUUID ();
48
63
49
64
private final boolean isNewTable ; // the transaction is creating a new table
@@ -95,13 +110,45 @@ public StructType getSchema(Engine engine) {
95
110
}
96
111
97
112
@ Override
98
- public TransactionCommitResult commit (
113
+ public TransactionCommitResult commit (Engine engine , CloseableIterable <Row > dataActions )
114
+ throws ConcurrentWriteException {
115
+ try {
116
+ checkState (!closed ,
117
+ "Transaction is already attempted to commit. Create a new transaction." );
118
+
119
+ long commitAsVersion = readSnapshot .getVersion (engine ) + 1 ;
120
+ int numRetries = 0 ;
121
+ do {
122
+ logger .info ("Committing transaction as version = {}." , commitAsVersion );
123
+ try {
124
+ return doCommit (engine , commitAsVersion , dataActions );
125
+ } catch (FileAlreadyExistsException fnfe ) {
126
+ logger .info ("Concurrent write detected when committing as version = {}. " +
127
+ "Trying to resolve conflicts and retry commit." , commitAsVersion );
128
+ TransactionRebaseState rebaseState = ConflictChecker
129
+ .resolveConflicts (engine , readSnapshot , commitAsVersion , this );
130
+ long newCommitAsVersion = rebaseState .getLatestVersion () + 1 ;
131
+ checkArgument (commitAsVersion < newCommitAsVersion ,
132
+ "New commit version %d should be greater than the previous commit " +
133
+ "attempt version %d." , newCommitAsVersion , commitAsVersion );
134
+ commitAsVersion = newCommitAsVersion ;
135
+ }
136
+ numRetries ++;
137
+ } while (numRetries < NUM_TXN_RETRIES );
138
+ } finally {
139
+ closed = true ;
140
+ }
141
+
142
+ // we have exhausted the number of retries, give up.
143
+ logger .info ("Exhausted maximum retries ({}) for committing transaction." , NUM_TXN_RETRIES );
144
+ throw new ConcurrentWriteException ();
145
+ }
146
+
147
+ private TransactionCommitResult doCommit (
99
148
Engine engine ,
149
+ long commitAsVersion ,
100
150
CloseableIterable <Row > dataActions )
101
- throws ConcurrentWriteException {
102
- checkState (
103
- !closed ,
104
- "Transaction is already attempted to commit. Create a new transaction." );
151
+ throws FileAlreadyExistsException {
105
152
List <Row > metadataActions = new ArrayList <>();
106
153
metadataActions .add (createCommitInfoSingleAction (generateCommitAction ()));
107
154
if (isNewTable ) {
@@ -117,35 +164,40 @@ public TransactionCommitResult commit(
117
164
CloseableIterator <Row > dataAndMetadataActions =
118
165
toCloseableIterator (metadataActions .iterator ()).combine (stageDataIter );
119
166
120
- try {
121
- long readVersion = readSnapshot .getVersion (engine );
122
- if (readVersion == -1 ) {
123
- // New table, create a delta log directory
124
- if (!engine .getFileSystemClient ().mkdirs (logPath .toString ())) {
125
- throw new RuntimeException (
126
- "Failed to create delta log directory: " + logPath );
127
- }
167
+ if (commitAsVersion == 0 ) {
168
+ // New table, create a delta log directory
169
+ if (!engine .getFileSystemClient ().mkdirs (logPath .toString ())) {
170
+ throw new RuntimeException (
171
+ "Failed to create delta log directory: " + logPath );
128
172
}
129
-
130
- long newVersion = readVersion + 1 ;
131
- // Write the staged data to a delta file
132
- engine .getJsonHandler ().writeJsonFileAtomically (
133
- FileNames .deltaFile (logPath , newVersion ),
134
- dataAndMetadataActions ,
135
- false /* overwrite */ );
136
-
137
- return new TransactionCommitResult (newVersion , isReadyForCheckpoint (newVersion ));
138
- } catch (FileAlreadyExistsException e ) {
139
- // TODO: Resolve conflicts and retry commit
140
- throw new ConcurrentWriteException ();
141
173
}
174
+
175
+ // Write the staged data to a delta file
176
+ engine .getJsonHandler ().writeJsonFileAtomically (
177
+ FileNames .deltaFile (logPath , commitAsVersion ),
178
+ dataAndMetadataActions ,
179
+ false /* overwrite */ );
180
+
181
+ return new TransactionCommitResult (
182
+ commitAsVersion ,
183
+ isReadyForCheckpoint (commitAsVersion ));
184
+ } catch (FileAlreadyExistsException e ) {
185
+ throw e ;
142
186
} catch (IOException ioe ) {
143
187
throw new RuntimeException (ioe );
144
- } finally {
145
- closed = true ;
146
188
}
147
189
}
148
190
191
+ public boolean isBlindAppend () {
192
+ // For now, Kernel just supports blind append.
193
+ // Change this when read-after-write is supported.
194
+ return true ;
195
+ }
196
+
197
+ public Optional <SetTransaction > getSetTxnOpt () {
198
+ return setTxnOpt ;
199
+ }
200
+
149
201
private Row generateCommitAction () {
150
202
return new CommitInfo (
151
203
System .currentTimeMillis (), /* timestamp */
@@ -162,12 +214,6 @@ private boolean isReadyForCheckpoint(long newVersion) {
162
214
return newVersion > 0 && newVersion % checkpointInterval == 0 ;
163
215
}
164
216
165
- private boolean isBlindAppend () {
166
- // For now, Kernel just supports blind append.
167
- // Change this when read-after-write is supported.
168
- return true ;
169
- }
170
-
171
217
private Map <String , String > getOperationParameters () {
172
218
if (isNewTable ) {
173
219
List <String > partitionCols = VectorUtils .toJavaList (metadata .getPartitionColumns ());
@@ -182,7 +228,7 @@ private Map<String, String> getOperationParameters() {
182
228
/**
183
229
* Get the part of the schema of the table that needs the statistics to be collected per file.
184
230
*
185
- * @param engine {@link Engine} instance to use.
231
+ * @param engine {@link Engine} instance to use.
186
232
* @param transactionState State of the transaction
187
233
* @return
188
234
*/
0 commit comments