Skip to content

Commit fe5d931

Browse files
committed
[3.2][Kernel][Writes] APIs and impl. for creating new tables (#3016)
(Split from #2944) APIs and implementation for creating partitioned or unpartitioned tables. No data insertion yet. Will come in the next PR. Test suite
1 parent c8bbd5b commit fe5d931

29 files changed

+1702
-67
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel;
17+
18+
import java.util.List;
19+
import java.util.Map;
20+
21+
import io.delta.kernel.annotation.Evolving;
22+
import io.delta.kernel.data.Row;
23+
import io.delta.kernel.engine.Engine;
24+
import io.delta.kernel.expressions.Column;
25+
26+
/**
27+
* Contains the context for writing data to Delta table. The context is created for each partition
28+
* for partitioned table or once per table for un-partitioned table. It is created using
29+
* {@link Transaction#getWriteContext(Engine, Row, Map)} (String, Map, List)}.
30+
*
31+
* @since 3.2.0
32+
*/
33+
@Evolving
34+
public interface DataWriteContext {
35+
/**
36+
* Returns the target directory where the data should be written.
37+
*
38+
* @return fully qualified path of the target directory
39+
*/
40+
String getTargetDirectory();
41+
42+
/**
43+
* Returns the list of {@link Column} that the connector can optionally collect statistics. Each
44+
* {@link Column} is a reference to a top-level or nested column in the table.
45+
* <p>
46+
* Statistics collections can be skipped or collected for a partial list of the returned
47+
* {@link Column}s. When stats are present in the written Delta log, they can be used to
48+
* optimize query performance.
49+
*
50+
* @return schema of the statistics
51+
*/
52+
List<Column> getStatisticsColumns();
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel;
17+
18+
/**
19+
* An operation that can be performed on a Delta table.
20+
* <p>
21+
* An operation is tracked as the first line in commit info action inside the Delta Log
22+
* It also shows up when {@code DESCRIBE HISTORY} on the table is executed.
23+
*/
24+
public enum Operation {
25+
26+
/**
27+
* Recorded when the table is created.
28+
*/
29+
CREATE_TABLE("CREATE TABLE"),
30+
31+
/**
32+
* Recorded during batch inserts.
33+
*/
34+
WRITE("WRITE"),
35+
36+
/**
37+
* Recorded during streaming inserts.
38+
*/
39+
STREAMING_UPDATE("STREAMING UPDATE"),
40+
41+
/**
42+
* For any operation that doesn't fit the above categories.
43+
*/
44+
MANUAL_UPDATE("Manual Update");
45+
46+
/**
47+
* Actual value that will be recorded in the transaction log
48+
*/
49+
private final String description;
50+
51+
Operation(String description) {
52+
this.description = description;
53+
}
54+
55+
/**
56+
* Returns the string that will be recorded in the transaction log.
57+
*/
58+
public String getDescription() {
59+
return description;
60+
}
61+
}

kernel/kernel-api/src/main/java/io/delta/kernel/Table.java

+15
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,21 @@ Snapshot getSnapshotAsOfVersion(Engine engine, long versionId)
120120
Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC)
121121
throws TableNotFoundException;
122122

123+
/**
124+
* Create a {@link TransactionBuilder} which can create a {@link Transaction} object to mutate
125+
* the table.
126+
*
127+
* @param engine {@link Engine} instance to use.
128+
* @param engineInfo information about the engine that is making the updates.
129+
* @param operation metadata of operation that is being performed. E.g. "insert", "delete".
130+
* @return {@link TransactionBuilder} instance to build the transaction.
131+
* @since 3.2.0
132+
*/
133+
TransactionBuilder createTransactionBuilder(
134+
Engine engine,
135+
String engineInfo,
136+
Operation operation);
137+
123138
/**
124139
* Checkpoint the table at given version. It writes a single checkpoint file.
125140
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel;
17+
18+
import java.util.List;
19+
import java.util.Map;
20+
21+
import io.delta.kernel.annotation.Evolving;
22+
import io.delta.kernel.data.FilteredColumnarBatch;
23+
import io.delta.kernel.data.Row;
24+
import io.delta.kernel.engine.Engine;
25+
import io.delta.kernel.exceptions.ConcurrentWriteException;
26+
import io.delta.kernel.expressions.Literal;
27+
import io.delta.kernel.types.StructType;
28+
import io.delta.kernel.utils.*;
29+
30+
/**
31+
* Represents a transaction to mutate a Delta table.
32+
*
33+
* @since 3.2.0
34+
*/
35+
@Evolving
36+
public interface Transaction {
37+
/**
38+
* Get the schema of the table. If the connector is adding any data to the table through this
39+
* transaction, it should have the same schema as the table schema.
40+
*/
41+
StructType getSchema(Engine engine);
42+
43+
/**
44+
* Get the list of logical names of the partition columns. This helps the connector to do
45+
* physical partitioning of the data before asking the Kernel to stage the data per partition.
46+
*/
47+
List<String> getPartitionColumns(Engine engine);
48+
49+
/**
50+
* Get the state of the transaction. The state helps Kernel do the transformations to logical
51+
* data according to the Delta protocol and table features enabled on the table. The engine
52+
* should use this at the data writer task to transform the logical data that the engine wants
53+
* to write to the table in to physical data that goes in data files using
54+
* {@link Transaction#transformLogicalData(Engine, Row, CloseableIterator, Map)}
55+
*/
56+
Row getTransactionState(Engine engine);
57+
58+
/**
59+
* Commit the transaction including the data action rows generated by
60+
* {@link Transaction#generateAppendActions}.
61+
*
62+
* @param engine {@link Engine} instance.
63+
* @param dataActions Iterable of data actions to commit. These data actions are generated by
64+
* the
65+
* {@link Transaction#generateAppendActions(Engine, Row, CloseableIterator,
66+
* DataWriteContext)}. The {@link CloseableIterable} allows the Kernel to
67+
* access the list of actions multiple times (in case of retries to resolve
68+
* the conflicts due to other writers to the table). Kernel provides a
69+
* in-memory based implementation of {@link CloseableIterable} with utility
70+
* API {@link CloseableIterable#inMemoryIterable(CloseableIterator)}
71+
* @return {@link TransactionCommitResult} status of the successful transaction.
72+
* @throws ConcurrentWriteException when the transaction has encountered a non-retryable
73+
* conflicts or exceeded the maximum number of retries reached.
74+
* The connector needs to rerun the query on top of the latest
75+
* table state and retry the transaction.
76+
*/
77+
TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions)
78+
throws ConcurrentWriteException;
79+
80+
/**
81+
* Given the logical data that needs to be written to the table, convert it into the required
82+
* physical data depending upon the table Delta protocol and features enabled on the table.
83+
* Kernel takes care of adding any additional column or removing existing columns that doesn't
84+
* need to be in physical data files. All these transformations are driven by the Delta protocol
85+
* and table features enabled on the table.
86+
* <p>
87+
* The given data should belong to exactly one partition. It is the job of the connector to do
88+
* partitioning of the data before calling the API. Partition values are provided as map of
89+
* column name to partition value (as {@link Literal}). If the table is an un-partitioned table,
90+
* then map should be empty.
91+
*
92+
* @param engine {@link Engine} instance to use.
93+
* @param transactionState The transaction state
94+
* @param dataIter Iterator of logical data (with schema same as the table schema)
95+
* to transform to physical data. All the data n this iterator should
96+
* belong to one physical partition and it should also include the
97+
* partition data.
98+
* @param partitionValues The partition values for the data. If the table is un-partitioned,
99+
* the map should be empty
100+
* @return Iterator of physical data to write to the data files.
101+
*/
102+
static CloseableIterator<FilteredColumnarBatch> transformLogicalData(
103+
Engine engine,
104+
Row transactionState,
105+
CloseableIterator<FilteredColumnarBatch> dataIter,
106+
Map<String, Literal> partitionValues) {
107+
throw new UnsupportedOperationException("Not implemented yet");
108+
}
109+
110+
/**
111+
* Get the context for writing data into a table. The context tells the connector where the data
112+
* should be written. For partitioned table context is generated per partition. So, the
113+
* connector should call this API for each partition. For un-partitioned table, the context is
114+
* same for all the data.
115+
*
116+
* @param engine {@link Engine} instance to use.
117+
* @param transactionState The transaction state
118+
* @param partitionValues The partition values for the data. If the table is un-partitioned,
119+
* the map should be empty
120+
* @return {@link DataWriteContext} containing metadata about where and how the data for
121+
* partition should be written.
122+
*/
123+
static DataWriteContext getWriteContext(
124+
Engine engine,
125+
Row transactionState,
126+
Map<String, Literal> partitionValues) {
127+
throw new UnsupportedOperationException("Not implemented yet");
128+
}
129+
130+
/**
131+
* For given data files, generate Delta actions that can be committed in a transaction.
132+
* These data files are the result of writing the data returned by
133+
* {@link Transaction#transformLogicalData} with the context returned by
134+
* {@link Transaction#getWriteContext}.
135+
*
136+
* @param engine {@link Engine} instance.
137+
* @param transactionState State of the transaction.
138+
* @param fileStatusIter Iterator of row objects representing each data file written.
139+
* @param dataWriteContext The context used when writing the data files given in
140+
* {@code fileStatusIter}
141+
* @return {@link CloseableIterator} of {@link Row} representing the actions to commit using
142+
* {@link Transaction#commit}.
143+
*/
144+
static CloseableIterator<Row> generateAppendActions(
145+
Engine engine,
146+
Row transactionState,
147+
CloseableIterator<DataFileStatus> fileStatusIter,
148+
DataWriteContext dataWriteContext) {
149+
throw new UnsupportedOperationException("Not implemented yet");
150+
}
151+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel;
17+
18+
import java.util.List;
19+
20+
import io.delta.kernel.annotation.Evolving;
21+
import io.delta.kernel.engine.Engine;
22+
import io.delta.kernel.types.StructType;
23+
24+
/**
25+
* Builder for creating a {@link Transaction} to mutate a Delta table.
26+
*
27+
* @since 3.2.0
28+
*/
29+
@Evolving
30+
public interface TransactionBuilder {
31+
/**
32+
* Set the schema of the table when creating a new table.
33+
*
34+
* @param engine {@link Engine} instance to use.
35+
* @param schema The new schema of the table.
36+
* @return updated {@link TransactionBuilder} instance.
37+
*/
38+
TransactionBuilder withSchema(Engine engine, StructType schema);
39+
40+
/**
41+
* Set the list of partitions columns when create a new partitioned table.
42+
*
43+
* @param engine {@link Engine} instance to use.
44+
* @param partitionColumns The partition columns of the table. These should be a subset of the
45+
* columns in the schema.
46+
* @return updated {@link TransactionBuilder} instance.
47+
*/
48+
TransactionBuilder withPartitionColumns(Engine engine, List<String> partitionColumns);
49+
50+
/**
51+
* Build the transaction. Also validates the given info to ensure that a valida transaction
52+
* can be created.
53+
*
54+
* @param engine {@link Engine} instance to use.
55+
*/
56+
Transaction build(Engine engine);
57+
}

0 commit comments

Comments
 (0)