Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2543] feat(spark-connector): support row-level operations to iceberg Table #2642

Closed
wants to merge 345 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
345 commits
Select commit Hold shift + click to select a range
b7145a4
fix
caican00 Mar 27, 2024
35bb0bf
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Mar 27, 2024
e543cdb
fix
caican00 Mar 27, 2024
497b1dd
fix
caican00 Mar 27, 2024
df700cc
fix
caican00 Mar 27, 2024
1238187
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Mar 27, 2024
87d6dd9
Merge branch 'main' into iceberg-delete
caican00 Mar 27, 2024
b49b585
Merge remote-tracking branch 'upstream/main' into iceberg-delete
caican00 Mar 27, 2024
38a069e
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Mar 27, 2024
6e3fbcb
Merge branch 'iceberg-delete' of github.com:caican00/gravitino into i…
caican00 Mar 27, 2024
74c7d2e
fix
caican00 Mar 27, 2024
03a5b4b
Merge branch 'main' into iceberg-read-write
caican00 Mar 27, 2024
34aa9e4
Merge branch 'main' into iceberg-row-level-update
caican00 Mar 27, 2024
eeb6318
fix
caican00 Mar 27, 2024
2d09006
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Mar 27, 2024
e4ad4d4
Merge branch 'iceberg-read-write' of github.com:caican00/gravitino in…
caican00 Mar 27, 2024
9b85805
Merge branch 'main' into iceberg-delete
caican00 Mar 27, 2024
66f45e6
fix
caican00 Mar 27, 2024
362acfe
Merge remote-tracking branch 'upstream/main' into iceberg-row-level-u…
caican00 Mar 27, 2024
f67d7a6
Merge remote-tracking branch 'upstream_dev/iceberg-row-level-update' …
caican00 Mar 27, 2024
2a994c7
fix
caican00 Mar 27, 2024
d549207
fix
caican00 Mar 27, 2024
c8e8948
fix
caican00 Mar 27, 2024
b8e61dc
Merge branch 'main' into iceberg-read-write
caican00 Mar 27, 2024
820535d
fix
caican00 Mar 28, 2024
0bfdaaf
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Mar 28, 2024
0c16dea
Merge branch 'main' into iceberg-row-level-update
caican00 Mar 28, 2024
ad98ce3
Merge branch 'main' into iceberg-read-write
caican00 Mar 28, 2024
b273dbc
Merge branch 'main' into iceberg-delete
caican00 Mar 28, 2024
85f009e
[#2586] feat(spark-connector): Support iceberg partition and distribu…
caican00 Mar 28, 2024
69b440e
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 Mar 28, 2024
99b0909
Merge branch 'main' into iceberg-read-write
caican00 Mar 28, 2024
e84e943
update its.
caican00 Mar 28, 2024
11e94ba
Merge remote-tracking branch 'upstream/main' into iceberg-metadata-co…
caican00 Mar 28, 2024
2bd021c
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Mar 28, 2024
db574da
Merge branch 'main' into iceberg-delete
caican00 Mar 28, 2024
682b942
[#2587] feat(spark-connector): Support iceberg metadataColumns
caican00 Mar 28, 2024
a9dcdbc
[#2587] feat(spark-connector): Support iceberg metadataColumns
caican00 Mar 28, 2024
cfdeb56
[#2587] feat(spark-connector): Support iceberg metadataColumns
caican00 Mar 28, 2024
0aba061
[#2587] feat(spark-connector): Support iceberg metadataColumns
caican00 Mar 28, 2024
d0bd521
[#2587] feat(spark-connector): Support iceberg metadataColumns
caican00 Mar 28, 2024
8a9deab
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Mar 28, 2024
2390043
Merge remote-tracking branch 'upstream/main' into iceberg-row-level-u…
caican00 Mar 28, 2024
c08c195
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Mar 28, 2024
33e8385
Merge remote-tracking branch 'upstream_dev/iceberg-row-level-update' …
caican00 Mar 28, 2024
b5cb75f
update
caican00 Mar 28, 2024
1232f80
update
caican00 Mar 28, 2024
cff3831
Merge branch 'main' into iceberg-delete
caican00 Mar 29, 2024
fb420d5
Merge branch 'main' into iceberg-delete
caican00 Mar 29, 2024
1662957
Merge remote-tracking branch 'upstream/main' into iceberg-metadata-co…
caican00 Mar 29, 2024
a337e14
Merge remote-tracking branch 'upstream/main' into iceberg-read-write
caican00 Mar 31, 2024
a2e8efd
update
caican00 Mar 31, 2024
69a7af6
update
caican00 Mar 31, 2024
3d494ac
update
caican00 Apr 1, 2024
824abe3
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Apr 1, 2024
015825d
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Apr 1, 2024
fd37b9c
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Apr 1, 2024
a6f6402
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Apr 1, 2024
924b5f0
Merge remote-tracking branch 'upstream_dev/iceberg-partition' into ic…
caican00 Apr 1, 2024
ba1d2a9
update
caican00 Apr 1, 2024
2301e35
update
caican00 Apr 1, 2024
8f632cb
Merge branch 'main' into iceberg-read-write
caican00 Apr 1, 2024
4a8072b
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Apr 1, 2024
bf5a4b7
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Apr 1, 2024
f225962
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Apr 1, 2024
c4051e2
Merge branch 'main' into iceberg-row-level-update
caican00 Apr 1, 2024
1ac625f
update
caican00 Apr 1, 2024
22b4afa
Merge branch 'iceberg-row-level-update' of github.com:caican00/gravit…
caican00 Apr 1, 2024
71577ed
update
caican00 Apr 2, 2024
d64e251
Merge branch 'main' into iceberg-read-write
caican00 Apr 2, 2024
6b16d0a
update
caican00 Apr 2, 2024
c319059
update
caican00 Apr 2, 2024
8f13efc
update
caican00 Apr 2, 2024
509b0d6
update
caican00 Apr 2, 2024
d136e7f
Merge branch 'main' into iceberg-read-write
caican00 Apr 2, 2024
7ac1b17
update
caican00 Apr 2, 2024
607de86
Merge remote-tracking branch 'upstream_dev/iceberg-read-write' into i…
caican00 Apr 2, 2024
57b3530
update
caican00 Apr 2, 2024
5c6a9ef
Merge remote-tracking branch 'upstream/main' into iceberg-row-level-u…
caican00 Apr 2, 2024
57ddb0f
update
caican00 Apr 2, 2024
f8a415c
Merge branch 'main' into iceberg-delete
caican00 Apr 2, 2024
a1b0dee
update
caican00 Apr 2, 2024
de462c5
Merge branch 'iceberg-delete' of github.com:caican00/gravitino into i…
caican00 Apr 2, 2024
7dca936
update
caican00 Apr 3, 2024
8bb51b1
update
caican00 Apr 3, 2024
ac93aff
update
caican00 Apr 3, 2024
cbd4e0a
update
caican00 Apr 3, 2024
e3fcf42
update
caican00 Apr 3, 2024
8406bb2
Merge branch 'main' into iceberg-delete
caican00 Apr 3, 2024
0a8c3e2
Merge branch 'main' into iceberg-delete
caican00 Apr 3, 2024
297e2fb
update
caican00 Apr 3, 2024
82ba37f
Merge branch 'iceberg-delete' of github.com:caican00/gravitino into i…
caican00 Apr 3, 2024
d849f56
update
caican00 Apr 3, 2024
0275401
Merge branch 'main' into iceberg-delete
yuqi1129 Apr 3, 2024
b372d1a
update
caican00 Apr 7, 2024
6012301
Merge remote-tracking branch 'upstream/main' into iceberg-partition
caican00 Apr 7, 2024
2e5ae32
Merge branch 'main' into iceberg-partition
caican00 Apr 7, 2024
222bf39
Merge remote-tracking branch 'upstream_dev/iceberg-delete' into icebe…
caican00 Apr 7, 2024
4fc47e5
update
caican00 Apr 7, 2024
b64494e
[#2824] bugfix(catalog-lakehouse-iceberg): Fixed abnormal timestamp t…
caican00 Apr 7, 2024
fef3e4d
Merge remote-tracking branch 'upstream_dev/iceberg-timestamp-fix' int…
caican00 Apr 7, 2024
934885f
update
caican00 Apr 7, 2024
5f17a2b
update
caican00 Apr 7, 2024
6e260cd
just for test
caican00 Apr 7, 2024
17ddcf0
Merge remote-tracking branch 'upstream/main' into iceberg-partition
caican00 Apr 7, 2024
0b7db9a
just for test
caican00 Apr 7, 2024
d96bc5b
just for test
caican00 Apr 7, 2024
cb5aced
update
caican00 Apr 7, 2024
22585a0
Merge branch 'main' into iceberg-partition
caican00 Apr 7, 2024
c9b4e19
update
caican00 Apr 8, 2024
8aab4f1
Merge branch 'iceberg-partition' of github.com:caican00/gravitino int…
caican00 Apr 8, 2024
625ce0b
update
caican00 Apr 8, 2024
9633b8f
update
caican00 Apr 8, 2024
33a8c60
update
caican00 Apr 8, 2024
2645a18
update
caican00 Apr 8, 2024
03f64a1
update
caican00 Apr 8, 2024
01c3197
update
caican00 Apr 8, 2024
685eb5f
test
caican00 Apr 8, 2024
8586f08
update
caican00 Apr 8, 2024
586cb31
Merge branch 'main' into iceberg-partition
caican00 Apr 8, 2024
783b682
update
caican00 Apr 8, 2024
19331ed
Merge branch 'iceberg-partition' of github.com:caican00/gravitino int…
caican00 Apr 8, 2024
20408a3
[#2845] Improvement[spark-connector]: Refactor the table implementati…
caican00 Apr 9, 2024
2473ffc
Merge branch 'main' into iceberg-refactor
caican00 Apr 9, 2024
25d57d6
[#2845] Improvement[spark-connector]: Refactor the table implementati…
caican00 Apr 9, 2024
254d1b2
Merge branch 'main' into iceberg-refactor
caican00 Apr 9, 2024
85adb65
Merge branch 'iceberg-refactor' of github.com:caican00/gravitino into…
caican00 Apr 9, 2024
2eb8bc9
[#2845] Improvement[spark-connector]: Refactor the table implementati…
caican00 Apr 9, 2024
fa56970
Merge branch 'main' into iceberg-refactor
caican00 Apr 9, 2024
7adaa23
[#2845] Improvement[spark-connector]: Refactor the table implementati…
caican00 Apr 9, 2024
7d104cd
Merge branch 'iceberg-refactor' of github.com:caican00/gravitino into…
caican00 Apr 9, 2024
eece580
[#2845] Improvement[spark-connector]: Refactor the table implementati…
caican00 Apr 9, 2024
f1e8aaa
Merge branch 'main' into iceberg-refactor
caican00 Apr 9, 2024
f1d2a7f
Merge branch 'main' into iceberg-refactor
caican00 Apr 9, 2024
d56cc48
[#2845] Improvement[spark-connector]: Refactor the table implementati…
caican00 Apr 9, 2024
454e8cc
[#2845] Improvement[spark-connector]: Refactor the table implementati…
caican00 Apr 9, 2024
26b4889
[#2845] Improvement[spark-connector]: Refactor the table implementati…
caican00 Apr 9, 2024
f355fa3
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-m…
caican00 Apr 9, 2024
8cc1ad3
Merge branch 'main' into iceberg-refactor
caican00 Apr 9, 2024
cfca204
Merge branch 'main' into iceberg-refactor
caican00 Apr 9, 2024
ea234c2
Merge branch 'main' into iceberg-refactor
caican00 Apr 9, 2024
f4360b4
updated
caican00 Apr 9, 2024
7d444cc
Merge remote-tracking branch 'upstream/main' into iceberg-metadata-co…
caican00 Apr 9, 2024
2798406
Merge remote-tracking branch 'upstream_dev/iceberg-partition' into ic…
caican00 Apr 9, 2024
174dfd2
updated
caican00 Apr 9, 2024
e759b46
Merge remote-tracking branch 'upstream/main' into iceberg-partition
caican00 Apr 9, 2024
698c209
updated
caican00 Apr 9, 2024
1487c97
updated
caican00 Apr 10, 2024
c7b69fe
updated
caican00 Apr 10, 2024
8625360
Merge remote-tracking branch 'upstream/main' into iceberg-row-level-u…
caican00 Apr 10, 2024
0b6ae79
updated
caican00 Apr 10, 2024
4f21655
updated
caican00 Apr 10, 2024
2e690cd
Merge remote-tracking branch 'upstream_dev/iceberg-refactor' into ice…
caican00 Apr 10, 2024
4b699b0
updated
caican00 Apr 10, 2024
9f90e52
Merge remote-tracking branch 'upstream_dev/iceberg-partition' into ic…
caican00 Apr 10, 2024
44fb207
updated
caican00 Apr 10, 2024
ab9e0a3
Merge remote-tracking branch 'upstream_dev/iceberg-metadata-columns' …
caican00 Apr 10, 2024
cac9f7f
updated
caican00 Apr 10, 2024
b4f293a
updated
caican00 Apr 10, 2024
0318139
Merge remote-tracking branch 'upstream_dev/iceberg-metadata-columns' …
caican00 Apr 10, 2024
90de5f2
updated
caican00 Apr 10, 2024
a847328
updated
caican00 Apr 10, 2024
0628e6b
updated
caican00 Apr 10, 2024
216d98a
Merge remote-tracking branch 'upstream_dev/iceberg-partition' into ic…
caican00 Apr 10, 2024
13c6331
Merge remote-tracking branch 'upstream_dev/iceberg-refactor' into ice…
caican00 Apr 10, 2024
57cf7ef
Merge remote-tracking branch 'upstream_dev/iceberg-partition' into ic…
caican00 Apr 10, 2024
caaaca3
Merge remote-tracking branch 'upstream_dev/iceberg-metadata-columns' …
caican00 Apr 10, 2024
17d02ae
updated
caican00 Apr 10, 2024
b0eea4c
updated
caican00 Apr 10, 2024
b633c61
Merge remote-tracking branch 'upstream_dev/iceberg-partition' into ic…
caican00 Apr 10, 2024
ab09c1d
updated
caican00 Apr 10, 2024
d4eef71
updated
caican00 Apr 10, 2024
2ad9aca
updated
caican00 Apr 10, 2024
d179024
Merge remote-tracking branch 'upstream_dev/iceberg-partition' into ic…
caican00 Apr 10, 2024
2fcd82d
updated
caican00 Apr 10, 2024
b21d698
updated
caican00 Apr 10, 2024
f1b40e9
Merge remote-tracking branch 'upstream_dev/iceberg-metadata-columns' …
caican00 Apr 10, 2024
4df485d
updated
caican00 Apr 10, 2024
fe409e3
updated
caican00 Apr 10, 2024
34474b2
updated
caican00 Apr 10, 2024
a15129a
Merge branch 'main' into iceberg-row-level-update
caican00 Apr 10, 2024
bd4566e
Merge branch 'main' into iceberg-metadata-columns
caican00 Apr 10, 2024
8c1ed27
Merge branch 'main' into iceberg-row-level-update
caican00 Apr 10, 2024
1c185a6
Merge branch 'main' into iceberg-row-level-update
caican00 Apr 10, 2024
cdb31ef
updated
caican00 Apr 10, 2024
83bc95e
Merge remote-tracking branch 'upstream/main' into iceberg-row-level-u…
caican00 Apr 11, 2024
06a4621
updated
caican00 Apr 11, 2024
e119ca9
Merge remote-tracking branch 'upstream_dev/iceberg-row-level-update' …
caican00 Apr 11, 2024
889c402
updated
caican00 Apr 11, 2024
65cd4ff
[#2927] Improvement(catalog-lakehouse-iceberg): Support more file for…
caican00 Apr 13, 2024
6e8af10
[#2927] Improvement(catalog-lakehouse-iceberg): Support more file for…
caican00 Apr 13, 2024
6a50948
update
caican00 Apr 14, 2024
4678e0b
Merge branch 'main' into iceberg-row-level-update
caican00 Apr 14, 2024
883d848
update
caican00 Apr 14, 2024
e3d8baf
Merge branch 'iceberg-row-level-update' of github.com:caican00/gravit…
caican00 Apr 14, 2024
d8e87f8
update
caican00 Apr 14, 2024
2d021b9
update
caican00 Apr 15, 2024
fc09aef
update
caican00 Apr 15, 2024
99a339a
update
caican00 Apr 15, 2024
69c7321
update
caican00 Apr 15, 2024
d51903a
update
caican00 Apr 15, 2024
99cbbb3
Merge remote-tracking branch 'origin/main' into iceberg-partition
caican00 Apr 17, 2024
4f237f2
Merge remote-tracking branch 'upstream_dev/iceberg-create-properties'…
caican00 Apr 17, 2024
5b4d1b5
update
caican00 Apr 17, 2024
ae75125
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 Apr 17, 2024
6d18ecf
update
caican00 Apr 17, 2024
de85245
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 Apr 17, 2024
6c48bce
update
caican00 Apr 17, 2024
21ec483
test
caican00 Apr 17, 2024
c58cd03
Merge branch 'main' into iceberg-partition
caican00 Apr 17, 2024
2d6f364
update trino sql file
caican00 Apr 17, 2024
172df2a
Merge branch 'iceberg-partition' of github.com:caican00/gravitino int…
caican00 Apr 17, 2024
565f491
update
caican00 Apr 17, 2024
591f1b4
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-p…
caican00 Apr 17, 2024
80c9b6b
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-m…
caican00 Apr 17, 2024
cd35668
update
caican00 Apr 17, 2024
e98e452
update
caican00 Apr 17, 2024
5515b62
update
caican00 Apr 17, 2024
45099a0
update
caican00 Apr 17, 2024
2848dfc
update
caican00 Apr 17, 2024
37808ae
Merge branch 'iceberg-partition' of github.com:caican00/gravitino int…
caican00 Apr 17, 2024
99280af
update
caican00 Apr 17, 2024
797cb74
update
caican00 Apr 17, 2024
939159a
update
caican00 Apr 17, 2024
ce33ebd
update
caican00 Apr 17, 2024
545b1a1
update
caican00 Apr 17, 2024
c84934d
update
caican00 Apr 17, 2024
4310826
Merge branch 'iceberg-partition' of github.com:caican00/gravitino int…
caican00 Apr 17, 2024
d20bcbd
update
caican00 Apr 17, 2024
419ca58
Merge branch 'iceberg-partition' of github.com:caican00/gravitino int…
caican00 Apr 17, 2024
8e61de3
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Apr 17, 2024
ae60022
Merge branch 'iceberg-metadata-columns' of github.com:caican00/gravit…
caican00 Apr 17, 2024
12e540e
update
caican00 Apr 17, 2024
22d4dda
update
caican00 Apr 17, 2024
8a1fa50
Merge branch 'iceberg-partition' of github.com:caican00/gravitino int…
caican00 Apr 17, 2024
17d720c
Merge branch 'main' into iceberg-row-level-update
caican00 Apr 18, 2024
cde1cb2
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Apr 24, 2024
db17806
update
caican00 Apr 24, 2024
33af3f9
update
caican00 Apr 24, 2024
4baff26
update
caican00 Apr 24, 2024
a98739f
update
caican00 Apr 24, 2024
0f83832
update
caican00 Apr 25, 2024
d6f0b5d
update
caican00 Apr 25, 2024
9776c0b
update
caican00 Apr 29, 2024
4cf028e
Merge branch 'main' into iceberg-row-level-update
caican00 Apr 29, 2024
69c4ece
update
caican00 Apr 30, 2024
0dfa669
Merge branch 'iceberg-row-level-update' of github.com:caican00/gravit…
caican00 Apr 30, 2024
5ae086b
Merge branch 'main' into iceberg-row-level-update
caican00 Apr 30, 2024
51921e7
update
caican00 Apr 30, 2024
7303841
Merge branch 'iceberg-row-level-update' of github.com:caican00/gravit…
caican00 Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@
./api/src/main/java/com/datastrato/gravitino/rel/expressions/Expression.java
./api/src/main/java/com/datastrato/gravitino/rel/expressions/NamedReference.java
./integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java
./spark-connector/spark-connector/src/main/scala/org/apache/spark/sql/iceberg/catalyst/parser/GravitinoIcebergSparkSqlExtensionsParser.scala

Apache Iceberg
./api/src/main/java/com/datastrato/gravitino/exceptions/RESTException.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,39 @@ protected static String getDeleteSql(String tableName, String condition) {
return String.format("DELETE FROM %s where %s", tableName, condition);
}

private static String getUpdateTableSql(String tableName, String setClause, String whereClause) {
return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
}

private static String getRowLevelUpdateTableSql(
caican00 marked this conversation as resolved.
Show resolved Hide resolved
String targetTableName, String selectClause, String sourceTableName, String onClause) {
return String.format(
"MERGE INTO %s "
+ "USING (%s) %s "
+ "ON %s "
+ "WHEN MATCHED THEN UPDATE SET * "
+ "WHEN NOT MATCHED THEN INSERT *",
targetTableName, selectClause, sourceTableName, onClause);
}

private static String getRowLevelDeleteTableSql(
String targetTableName, String selectClause, String sourceTableName, String onClause) {
return String.format(
"MERGE INTO %s "
+ "USING (%s) %s "
+ "ON %s "
+ "WHEN MATCHED THEN DELETE "
+ "WHEN NOT MATCHED THEN INSERT *",
targetTableName, selectClause, sourceTableName, onClause);
}

// Whether supports [CLUSTERED BY col_name3 SORTED BY col_name INTO num_buckets BUCKETS]
protected abstract boolean supportsSparkSQLClusteredBy();

protected abstract boolean supportsPartition();

protected abstract boolean supportsDelete();

// Use a custom database not the original default database because SparkCommonIT couldn't
// read&write data to tables in default database. The main reason is default database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address
Expand Down Expand Up @@ -702,6 +730,28 @@ void testTableOptions() {
checkTableReadWrite(tableInfo);
}

@Test
@EnabledIf("supportsDelete")
caican00 marked this conversation as resolved.
Show resolved Hide resolved
void testDeleteOperation() {
String tableName = "test_delete_table";
dropTableIfExists(tableName);
createSimpleTable(tableName);

SparkTableInfo table = getTableInfo(tableName);
checkTableColumns(tableName, getSimpleTableColumn(), table);
sql(
String.format(
"INSERT INTO %s VALUES (1, '1', 1),(2, '2', 2),(3, '3', 3),(4, '4', 4),(5, '5', 5)",
tableName));
List<String> queryResult1 = getTableData(tableName);
Assertions.assertEquals(5, queryResult1.size());
Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;5,5,5", String.join(";", queryResult1));
sql(getDeleteSql(tableName, "id <= 4"));
List<String> queryResult2 = getTableData(tableName);
Assertions.assertEquals(1, queryResult2.size());
Assertions.assertEquals("5,5,5", queryResult2.get(0));
}

protected void checkTableReadWrite(SparkTableInfo table) {
String name = table.getTableIdentifier();
boolean isPartitionTable = table.isPartitionTable();
Expand Down Expand Up @@ -760,6 +810,49 @@ protected String getExpectedTableData(SparkTableInfo table) {
.collect(Collectors.joining(","));
}

protected void checkTableRowLevelUpdate(String tableName) {
writeToEmptyTableAndCheckData(tableName);
String updatedValues = "id = 6, name = '6', age = 6";
sql(getUpdateTableSql(tableName, updatedValues, "id = 5"));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName));
Assertions.assertEquals(5, queryResult.size());
Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;6,6,6", String.join(";", queryResult));
}

protected void checkTableRowLevelDelete(String tableName) {
writeToEmptyTableAndCheckData(tableName);
sql(getDeleteSql(tableName, "id <= 2"));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName));
Assertions.assertEquals(3, queryResult.size());
Assertions.assertEquals("3,3,3;4,4,4;5,5,5", String.join(";", queryResult));
}

protected void checkTableDeleteByMergeInto(String tableName) {
writeToEmptyTableAndCheckData(tableName);

String sourceTableName = "source_table";
String selectClause =
"SELECT 1 AS id, '1' AS name, 1 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age";
String onClause = String.format("%s.id = %s.id", tableName, sourceTableName);
sql(getRowLevelDeleteTableSql(tableName, selectClause, sourceTableName, onClause));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName));
Assertions.assertEquals(5, queryResult.size());
Assertions.assertEquals("2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult));
}

protected void checkTableUpdateByMergeInto(String tableName) {
writeToEmptyTableAndCheckData(tableName);

String sourceTableName = "source_table";
String selectClause =
"SELECT 1 AS id, '2' AS name, 2 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age";
String onClause = String.format("%s.id = %s.id", tableName, sourceTableName);
sql(getRowLevelUpdateTableSql(tableName, selectClause, sourceTableName, onClause));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName));
Assertions.assertEquals(6, queryResult.size());
Assertions.assertEquals("1,2,2;2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult));
}

protected String getCreateSimpleTableString(String tableName) {
return getCreateSimpleTableString(tableName, false);
}
Expand Down Expand Up @@ -801,6 +894,16 @@ protected void checkTableColumns(
.check(tableInfo);
}

private void writeToEmptyTableAndCheckData(String tableName) {
sql(
String.format(
"INSERT INTO %s VALUES (1, '1', 1),(2, '2', 2),(3, '3', 3),(4, '4', 4),(5, '5', 5)",
tableName));
List<String> queryResult = getTableData(tableName);
Assertions.assertEquals(5, queryResult.size());
Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;5,5,5", String.join(";", queryResult));
}

// partition expression may contain "'", like a='s'/b=1
private String getPartitionExpression(SparkTableInfo table, String delimiter) {
return table.getPartitionedColumns().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ protected boolean supportsPartition() {
return true;
}

@Override
protected boolean supportsDelete() {
return false;
}

@Test
public void testCreateHiveFormatPartitionTable() {
String tableName = "hive_partition_table";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -18,10 +19,12 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
Expand All @@ -30,13 +33,21 @@
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.StringUtils;
import scala.Tuple3;

public abstract class SparkIcebergCatalogIT extends SparkCommonIT {

private static final String ICEBERG_FORMAT_VERSION = "format-version";
private static final String ICEBERG_DELETE_MODE = "write.delete.mode";
private static final String ICEBERG_UPDATE_MODE = "write.update.mode";
private static final String ICEBERG_MERGE_MODE = "write.merge.mode";

@Override
protected String getCatalogName() {
return "iceberg";
Expand All @@ -57,6 +68,11 @@ protected boolean supportsPartition() {
return true;
}

@Override
protected boolean supportsDelete() {
return true;
}

@Override
protected String getTableLocation(SparkTableInfo table) {
return String.join(File.separator, table.getTableLocation(), "data");
Expand Down Expand Up @@ -216,6 +232,26 @@ void testIcebergMetadataColumns() throws NoSuchTableException {
testDeleteMetadataColumn();
}

@Test
void testInjectSparkExtensions() {
SparkSession sparkSession = getSparkSession();
SparkConf conf = sparkSession.sparkContext().getConf();
Assertions.assertTrue(conf.contains(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key()));
String extensions = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key());
Assertions.assertTrue(StringUtils.isNotBlank(extensions));
Assertions.assertEquals(
"org.apache.spark.sql.iceberg.extensions.GravitinoIcebergSparkSessionExtensions,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
extensions);
}

@Test
void testIcebergTableRowLevelOperations() {
testIcebergDeleteOperation();
testIcebergUpdateOperation();
testIcebergMergeIntoDeleteOperation();
testIcebergMergeIntoUpdateOperation();
}

private void testMetadataColumns() {
String tableName = "test_metadata_columns";
dropTableIfExists(tableName);
Expand Down Expand Up @@ -386,6 +422,88 @@ private void testDeleteMetadataColumn() {
Assertions.assertEquals(0, queryResult1.size());
}

private void testIcebergDeleteOperation() {
caican00 marked this conversation as resolved.
Show resolved Hide resolved
getIcebergTablePropertyValues()
.forEach(
tuple -> {
String tableName =
String.format("test_iceberg_%s_%s_delete_operation", tuple._1(), tuple._2());
dropTableIfExists(tableName);
createIcebergTableWithTabProperties(
tableName,
tuple._1(),
ImmutableMap.of(
ICEBERG_FORMAT_VERSION,
String.valueOf(tuple._2()),
ICEBERG_DELETE_MODE,
tuple._3()));
checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName));
checkTableRowLevelDelete(tableName);
});
}

private void testIcebergUpdateOperation() {
getIcebergTablePropertyValues()
.forEach(
tuple -> {
String tableName =
String.format("test_iceberg_%s_%s_update_operation", tuple._1(), tuple._2());
dropTableIfExists(tableName);
createIcebergTableWithTabProperties(
tableName,
tuple._1(),
ImmutableMap.of(
ICEBERG_FORMAT_VERSION,
String.valueOf(tuple._2()),
ICEBERG_UPDATE_MODE,
tuple._3()));
checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName));
checkTableRowLevelUpdate(tableName);
});
}

private void testIcebergMergeIntoDeleteOperation() {
getIcebergTablePropertyValues()
.forEach(
tuple -> {
String tableName =
String.format(
"test_iceberg_%s_%s_mergeinto_delete_operation", tuple._1(), tuple._2());
dropTableIfExists(tableName);
createIcebergTableWithTabProperties(
tableName,
tuple._1(),
ImmutableMap.of(
ICEBERG_FORMAT_VERSION,
String.valueOf(tuple._2()),
ICEBERG_MERGE_MODE,
tuple._3()));
checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName));
checkTableDeleteByMergeInto(tableName);
});
}

private void testIcebergMergeIntoUpdateOperation() {
getIcebergTablePropertyValues()
.forEach(
tuple -> {
String tableName =
String.format(
"test_iceberg_%s_%s_mergeinto_update_operation", tuple._1(), tuple._2());
dropTableIfExists(tableName);
createIcebergTableWithTabProperties(
tableName,
tuple._1(),
ImmutableMap.of(
ICEBERG_FORMAT_VERSION,
String.valueOf(tuple._2()),
ICEBERG_MERGE_MODE,
tuple._3()));
checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName));
checkTableUpdateByMergeInto(tableName);
});
}

private List<SparkTableInfo.SparkColumnInfo> getIcebergSimpleTableColumn() {
return Arrays.asList(
SparkTableInfo.SparkColumnInfo.of("id", DataTypes.IntegerType, "id comment"),
Expand Down Expand Up @@ -416,4 +534,26 @@ private SparkMetadataColumnInfo[] getIcebergMetadataColumns() {
new SparkMetadataColumnInfo("_deleted", DataTypes.BooleanType, false)
};
}

private List<Tuple3<Boolean, Integer, String>> getIcebergTablePropertyValues() {
return Arrays.asList(
new Tuple3<>(false, 1, "copy-on-write"),
new Tuple3<>(false, 2, "merge-on-read"),
new Tuple3<>(true, 1, "copy-on-write"),
new Tuple3<>(true, 2, "merge-on-read"));
}

private void createIcebergTableWithTabProperties(
String tableName, boolean isPartitioned, ImmutableMap<String, String> tblProperties) {
String partitionedClause = isPartitioned ? " PARTITIONED BY (name) " : "";
String tblPropertiesStr =
tblProperties.entrySet().stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(","));
String createSql =
String.format(
"CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', age INT) %s TBLPROPERTIES(%s)",
tableName, partitionedClause, tblPropertiesStr);
sql(createSql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ protected void insertTableAsSelect(String tableName, String newName) {
sql(String.format("INSERT INTO TABLE %s SELECT * FROM %s", newName, tableName));
}

protected static String getSelectAllSqlWithOrder(String tableName) {
return String.format("SELECT * FROM %s ORDER BY id", tableName);
}

private static String getSelectAllSql(String tableName) {
return String.format("SELECT * FROM %s", tableName);
}
Expand Down
24 changes: 24 additions & 0 deletions spark-connector/spark-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
plugins {
`maven-publish`
id("java")
id("scala")
id("idea")
alias(libs.plugins.shadow)
}
Expand Down Expand Up @@ -40,3 +41,26 @@ dependencies {
testImplementation("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion")
testRuntimeOnly(libs.junit.jupiter.engine)
}

sourceSets {
main {
scala {
setSrcDirs(listOf("src/main/scala", "src/main/java"))
Copy link
Collaborator Author

@caican00 caican00 Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GravitinoDriverPlugin references the scala class, so we need to compile scala classes before java classes

}
java {
setSrcDirs(emptyList<String>())
}
}
test {
scala {
setSrcDirs(listOf("src/test/scala", "src/test/java"))
}
java {
setSrcDirs(emptyList<String>())
}
}
}

tasks.withType(ScalaCompile::class.java).configureEach {
scalaCompileOptions.additionalParameters = listOf("-target:jvm-1.8")
Copy link
Collaborator Author

@caican00 caican00 Apr 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gradle/gradle#19456 (comment)

without this, we will get the error, bad option: '-target:jvm-1.11' or bad option: '-target:jvm-1.17', seems the only solution would be to force the jvm 8 compatibility.

}
Loading
Loading