Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d8ae28b

Browse files
committedMay 1, 2024·
[Kernel] APIs for inserting data into a Delta table
1 parent 809b62b commit d8ae28b

26 files changed

+1680
-33
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel;
17+
18+
import java.util.ConcurrentModificationException;
19+
20+
import io.delta.kernel.annotation.Evolving;
21+
22+
/**
23+
* Thrown when concurrent transaction both attempt to update the same idempotent transaction.
24+
*
25+
* @since 3.2.0
26+
*/
27+
@Evolving
28+
public class ConcurrentTransactionException extends ConcurrentModificationException {
29+
private static final String message = "This error occurs when multiple updates are " +
30+
"using the same transaction identifier to write into this table.\n" +
31+
"Application ID: %s, Attempted version: %s, Latest version in table: %s";
32+
33+
public ConcurrentTransactionException(String appId, long txnVersion, long lastUpdated) {
34+
super(String.format(message, appId, txnVersion, lastUpdated));
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel;
17+
18+
import java.util.ConcurrentModificationException;
19+
20+
import io.delta.kernel.annotation.Evolving;
21+
22+
/**
23+
* Thrown when a concurrent transaction has written data after the current transaction read the
24+
* table.
25+
*
26+
* @since 3.2.0
27+
*/
28+
@Evolving
29+
public class ConcurrentWriteException extends ConcurrentModificationException {
30+
public ConcurrentWriteException() {
31+
super("Transaction has encountered a conflict and can not be committed. " +
32+
"Query needs to be re-executed using the latest version of the table.");
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel;
17+
18+
import java.util.List;
19+
import java.util.Map;
20+
import static java.util.Collections.unmodifiableList;
21+
import static java.util.Collections.unmodifiableMap;
22+
23+
import io.delta.kernel.annotation.Evolving;
24+
import io.delta.kernel.expressions.Column;
25+
import io.delta.kernel.expressions.Literal;
26+
27+
/**
28+
* Contains the context for writing data related to a partition to Delta table.
29+
*
30+
* @since 3.2.0
31+
*/
32+
@Evolving
33+
public class DataWriteContext {
34+
private final String targetDirectory;
35+
private final Map<String, Literal> partitionValues;
36+
private final List<Column> statsColumns;
37+
38+
/**
39+
* Creates a new instance of WriteContext.
40+
*
41+
* @param partitionPath fully qualified path of the target directory
42+
* @param partitionValues partition values
43+
* @param statsColumns schema of the statistics
44+
*/
45+
public DataWriteContext(
46+
String partitionPath,
47+
Map<String, Literal> partitionValues,
48+
List<Column> statsColumns) {
49+
this.targetDirectory = partitionPath;
50+
this.partitionValues = unmodifiableMap(partitionValues);
51+
this.statsColumns = unmodifiableList(statsColumns);
52+
}
53+
54+
/**
55+
* Returns the target directory where the data should be written.
56+
*
57+
* @return fully qualified path of the target directory
58+
*/
59+
public String getTargetDirectory() {
60+
return targetDirectory;
61+
}
62+
63+
/**
64+
* Returns the partition values for the data to be written.
65+
*
66+
* @return partition values
67+
*/
68+
public Map<String, Literal> getPartitionValues() {
69+
return partitionValues;
70+
}
71+
72+
/**
73+
* Returns the list of columns that need to statistics for the data to be written. Statistics
74+
* collections is optional, but when present can be used to optimize query performance.
75+
*
76+
* @return schema of the statistics
77+
*/
78+
public List<Column> getStatisticsColumns() {
79+
return statsColumns;
80+
}
81+
}

‎kernel/kernel-api/src/main/java/io/delta/kernel/Table.java

+15
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,21 @@ Snapshot getSnapshotAsOfVersion(TableClient tableClient, long versionId)
114114
Snapshot getSnapshotAsOfTimestamp(TableClient tableClient, long millisSinceEpochUTC)
115115
throws TableNotFoundException;
116116

117+
/**
118+
* Create a {@link TransactionBuilder} which can create a {@link Transaction} object to mutate
119+
* the table.
120+
*
121+
* @param tableClient {@link TableClient} instance to use.
122+
* @param engineInfo information about the engine that is making the updates.
123+
* @param operation metadata of operation that is being performed. E.g. "insert", "delete".
124+
* @return {@link TransactionBuilder} instance to build the transaction.
125+
* @since 3.2.0
126+
*/
127+
TransactionBuilder createTransactionBuilder(
128+
TableClient tableClient,
129+
String engineInfo,
130+
String operation);
131+
117132
/**
118133
* Checkpoint the table at given version. It writes a single checkpoint file.
119134
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel;
17+
18+
import java.net.URI;
19+
import java.util.*;
20+
21+
import io.delta.kernel.annotation.Evolving;
22+
import io.delta.kernel.client.TableClient;
23+
import io.delta.kernel.data.*;
24+
import io.delta.kernel.expressions.Literal;
25+
import io.delta.kernel.types.StructType;
26+
import io.delta.kernel.utils.*;
27+
28+
import io.delta.kernel.internal.actions.AddFile;
29+
import io.delta.kernel.internal.actions.SingleAction;
30+
import io.delta.kernel.internal.fs.Path;
31+
import static io.delta.kernel.internal.TransactionImpl.getStatisticsColumns;
32+
import static io.delta.kernel.internal.data.TransactionStateRow.*;
33+
import static io.delta.kernel.internal.util.PartitionUtils.getTargetDirectory;
34+
import static io.delta.kernel.internal.util.PartitionUtils.validatePartitionValues;
35+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
36+
37+
/**
38+
* Represents a transaction to mutate a Delta table.
39+
*
40+
* @since 3.2.0
41+
*/
42+
@Evolving
43+
public interface Transaction {
44+
/**
45+
* Get the schema of the data that is being written to the table.
46+
*/
47+
StructType getSchema(TableClient tableClient);
48+
49+
/**
50+
* Get the list of logical names of the partition columns. This helps the connector to do
51+
* physical partitioning of the data before asking the Kernel to stage the data per partition.
52+
*/
53+
List<String> getPartitionColumns(TableClient tableClient);
54+
55+
/**
56+
* Get the state of the transaction. The state helps Kernel do the transformations to logical
57+
* data according to the Delta protocol and table features enabled on the table. The engine
58+
* should use this at the data writer task to transform the logical data that the engine wants
59+
* to write to the table in to physical data that goes in data files using
60+
* {@link Transaction#transformLogicalData(TableClient, Row, CloseableIterator, Map)}
61+
*/
62+
Row getState(TableClient tableClient);
63+
64+
/**
65+
* Commit the transaction including the staged data rows generated by
66+
* {@link Transaction#generateAppendActions}.
67+
*
68+
* @param tableClient {@link TableClient} instance.
69+
* @param stagedData Iterable of data actions to commit. These data actions are generated by
70+
* the {@link Transaction#generateAppendActions(TableClient, Row,
71+
* CloseableIterator, DataWriteContext)}. The {@link CloseableIterable}
72+
* allows the Kernel to access the list of actions multiple times (in case
73+
* of retries to resolve the conflicts due to other writers to the table).
74+
* @return {@link TransactionCommitResult} status of the successful transaction.
75+
* @throws ConcurrentWriteException when the transaction has encountered a non-retryable
76+
* conflicts or exceeded the maximum number of retries reached.
77+
* The connector needs to rerun the query on top of the latest
78+
* table state and retry the transaction.
79+
*/
80+
TransactionCommitResult commit(TableClient tableClient, CloseableIterable<Row> stagedData)
81+
throws ConcurrentWriteException;
82+
83+
/**
84+
* Given the logical data that needs to be written to the table, convert it into the required
85+
* physical data depending upon the table Delta protocol and features enabled on the table.
86+
* Kernel takes care of adding any additional column or removing existing columns that doesn't
87+
* need to be in physical data files. All these transformations are driven by the Delta protocol
88+
* and table features enabled on the table.
89+
* <p>
90+
* The given data should belong to exactly one partition. It is the job of the connector to do
91+
* partitioning of the data before calling the API. Partition values are provided as map of
92+
* column name to partition value (as {@link Literal}). If the table is an un-partitioned table,
93+
* then map should be empty.
94+
*
95+
* @param tableClient {@link TableClient} instance to use.
96+
* @param transactionState The transaction state
97+
* @param dataIter Iterator of logical data to transform to physical data. All the data
98+
* in this iterator should belong to one physical partition.
99+
* @param partitionValues The partition values for the data. If the table is un-partitioned,
100+
* the map should be empty
101+
* @return Iterator of physical data to write to the data files.
102+
*/
103+
static CloseableIterator<FilteredColumnarBatch> transformLogicalData(
104+
TableClient tableClient,
105+
Row transactionState,
106+
CloseableIterator<FilteredColumnarBatch> dataIter,
107+
Map<String, Literal> partitionValues) {
108+
validatePartitionValues(
109+
getPartitionColumnsList(transactionState), partitionValues);
110+
// TODO: add support for:
111+
// - enforcing the constraints
112+
// - generating the default value columns
113+
// - generating the generated columns
114+
115+
// Remove the partition columns from the data as they are already part of file metadata
116+
// and are not needed in the data files. TODO: once we start supporting uniform complaint
117+
// tables, we may conditionally skip this step.
118+
119+
// TODO: set the correct schema once writing into column mapping enabled table is supported.
120+
return dataIter.map(
121+
filteredColumnarBatch -> {
122+
ColumnarBatch data = filteredColumnarBatch.getData();
123+
for (String partitionColName : partitionValues.keySet()) {
124+
int partitionColIndex = data.getSchema().indexOf(partitionColName);
125+
checkArgument(
126+
partitionColIndex >= 0,
127+
"Partition column %s not found in the data",
128+
partitionColName);
129+
data = data.withDeletedColumnAt(partitionColIndex);
130+
}
131+
return new FilteredColumnarBatch(data,
132+
filteredColumnarBatch.getSelectionVector());
133+
}
134+
);
135+
}
136+
137+
/**
138+
* Get the context for writing data into a table. The context tells the connector where the data
139+
* should be written and what should be the target file size. For partitioned table context is
140+
* generated per partition. So, the connector should call this API for each partition. For
141+
* un-partitioned table, the context is same for all the data.
142+
*
143+
* @param tableClient {@link TableClient} instance to use.
144+
* @param transactionState The transaction state
145+
* @param partitionValues The partition values for the data. If the table is un-partitioned,
146+
* the map should be empty
147+
* @return {@link DataWriteContext} containing metadata about where and how the data for
148+
* partition should be written.
149+
*/
150+
static DataWriteContext getWriteContext(
151+
TableClient tableClient,
152+
Row transactionState,
153+
Map<String, Literal> partitionValues) {
154+
validatePartitionValues(
155+
getPartitionColumnsList(transactionState),
156+
partitionValues);
157+
158+
String targetDirectory = getTargetDirectory(
159+
getTableRoot(transactionState),
160+
getPartitionColumnsList(transactionState),
161+
partitionValues);
162+
return new DataWriteContext(
163+
targetDirectory,
164+
partitionValues,
165+
getStatisticsColumns(tableClient, transactionState));
166+
}
167+
168+
/**
169+
* For given newly data files, generate Delta actions that can be committed in a transaction.
170+
* These data files are the result of writing the data returned by
171+
* {@link Transaction#transformLogicalData} with the context returned by
172+
* {@link Transaction#getWriteContext}.
173+
*
174+
* @param tableClient {@link TableClient} instance.
175+
* @param transactionState State of the transaction.
176+
* @param fileStatusIter Iterator of row objects representing each data file written.
177+
* @param dataWriteContext The context used when writing the data files given in
178+
* {@code fileStatusIter}
179+
* @return {@link CloseableIterator} of {@link Row} representing the actions to commit using
180+
* {@link Transaction#commit}.
181+
*/
182+
static CloseableIterator<Row> generateAppendActions(
183+
TableClient tableClient,
184+
Row transactionState,
185+
CloseableIterator<DataFileStatus> fileStatusIter,
186+
DataWriteContext dataWriteContext) {
187+
URI tableRoot = new Path(getTableRoot(transactionState)).toUri();
188+
return fileStatusIter.map(
189+
dataFileStatus -> {
190+
Row addFileRow = AddFile.convertDataFileStatus(
191+
tableRoot,
192+
dataFileStatus,
193+
dataWriteContext.getPartitionValues(),
194+
true /* dataChange */);
195+
return SingleAction.createAddFileSingleAction(addFileRow);
196+
}
197+
);
198+
}
199+
}

0 commit comments

Comments
 (0)
Please sign in to comment.