Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2543] feat(spark-connector): support row-level operations to iceberg Table #3243

Merged
merged 13 commits into from
May 13, 2024
Merged
25 changes: 20 additions & 5 deletions docs/spark-connector/spark-catalog-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This software is licensed under the Apache License version 2."

## Capabilities

#### Support basic DML and DDL operations:
#### Support DML and DDL operations:

- `CREATE TABLE`

Expand All @@ -18,13 +18,12 @@ Supports basic create table clause including table schema, properties, partition
- `ALTER TABLE`
- `INSERT INTO&OVERWRITE`
- `SELECT`
- `DELETE`
Supports file delete only.
- `MERGE INOT`
- `DELETE FROM`
- `UPDATE`

#### Not supported operations:

- Row level operations. like `MERGE INOT`, `DELETE FROM`, `UPDATE`
- View operations.
- Branching and tagging operations.
- Spark procedures.
Expand Down Expand Up @@ -57,6 +56,22 @@ VALUES
(3, 'Charlie', 'Sales', TIMESTAMP '2021-03-01 08:45:00');

SELECT * FROM employee WHERE date(hire_date) = '2021-01-01'

UPDATE employee SET department = 'Jenny' WHERE id = 1;

DELETE FROM employee WHERE id < 2;

MERGE INTO employee
USING (SELECT 4 as id, 'David' as name, 'Engineering' as department, TIMESTAMP '2021-04-01 09:00:00' as hire_date) as new_employee
ON employee.id = new_employee.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

MERGE INTO employee
USING (SELECT 4 as id, 'David' as name, 'Engineering' as department, TIMESTAMP '2021-04-01 09:00:00' as hire_date) as new_employee
ON employee.id = new_employee.id
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;
```

## Catalog properties
Expand Down
4 changes: 4 additions & 0 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ plugins {

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val kyuubiVersion: String = libs.versions.kyuubi.get()
val icebergVersion: String = libs.versions.iceberg.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

Expand Down Expand Up @@ -114,6 +116,8 @@ dependencies {
exclude("io.dropwizard.metrics")
exclude("org.rocksdb")
}
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")

testImplementation(libs.okhttp3.loginterceptor)
testImplementation(libs.postgresql.driver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,39 @@ protected static String getDeleteSql(String tableName, String condition) {
return String.format("DELETE FROM %s where %s", tableName, condition);
}

private static String getUpdateTableSql(String tableName, String setClause, String whereClause) {
return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
}

private static String getRowLevelUpdateTableSql(
String targetTableName, String selectClause, String sourceTableName, String onClause) {
return String.format(
"MERGE INTO %s "
+ "USING (%s) %s "
+ "ON %s "
+ "WHEN MATCHED THEN UPDATE SET * "
+ "WHEN NOT MATCHED THEN INSERT *",
targetTableName, selectClause, sourceTableName, onClause);
}

private static String getRowLevelDeleteTableSql(
String targetTableName, String selectClause, String sourceTableName, String onClause) {
return String.format(
"MERGE INTO %s "
+ "USING (%s) %s "
+ "ON %s "
+ "WHEN MATCHED THEN DELETE "
+ "WHEN NOT MATCHED THEN INSERT *",
targetTableName, selectClause, sourceTableName, onClause);
}

// Whether supports [CLUSTERED BY col_name3 SORTED BY col_name INTO num_buckets BUCKETS]
protected abstract boolean supportsSparkSQLClusteredBy();

protected abstract boolean supportsPartition();

protected abstract boolean supportsDelete();

// Use a custom database not the original default database because SparkCommonIT couldn't
// read&write data to tables in default database. The main reason is default database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address
Expand Down Expand Up @@ -702,6 +730,28 @@ void testTableOptions() {
checkTableReadWrite(tableInfo);
}

@Test
@EnabledIf("supportsDelete")
void testDeleteOperation() {
String tableName = "test_row_level_delete_table";
dropTableIfExists(tableName);
createSimpleTable(tableName);

SparkTableInfo table = getTableInfo(tableName);
checkTableColumns(tableName, getSimpleTableColumn(), table);
sql(
String.format(
"INSERT INTO %s VALUES (1, '1', 1),(2, '2', 2),(3, '3', 3),(4, '4', 4),(5, '5', 5)",
tableName));
List<String> queryResult1 = getTableData(tableName);
Assertions.assertEquals(5, queryResult1.size());
Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;5,5,5", String.join(";", queryResult1));
sql(getDeleteSql(tableName, "id <= 4"));
List<String> queryResult2 = getTableData(tableName);
Assertions.assertEquals(1, queryResult2.size());
Assertions.assertEquals("5,5,5", queryResult2.get(0));
}

protected void checkTableReadWrite(SparkTableInfo table) {
String name = table.getTableIdentifier();
boolean isPartitionTable = table.isPartitionTable();
Expand Down Expand Up @@ -760,6 +810,49 @@ protected String getExpectedTableData(SparkTableInfo table) {
.collect(Collectors.joining(","));
}

protected void checkRowLevelUpdate(String tableName) {
writeToEmptyTableAndCheckData(tableName);
String updatedValues = "id = 6, name = '6', age = 6";
sql(getUpdateTableSql(tableName, updatedValues, "id = 5"));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(5, queryResult.size());
Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;6,6,6", String.join(";", queryResult));
}

protected void checkRowLevelDelete(String tableName) {
writeToEmptyTableAndCheckData(tableName);
sql(getDeleteSql(tableName, "id <= 2"));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(3, queryResult.size());
Assertions.assertEquals("3,3,3;4,4,4;5,5,5", String.join(";", queryResult));
}

protected void checkDeleteByMergeInto(String tableName) {
writeToEmptyTableAndCheckData(tableName);

String sourceTableName = "source_table";
String selectClause =
"SELECT 1 AS id, '1' AS name, 1 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age";
String onClause = String.format("%s.id = %s.id", tableName, sourceTableName);
sql(getRowLevelDeleteTableSql(tableName, selectClause, sourceTableName, onClause));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(5, queryResult.size());
Assertions.assertEquals("2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult));
}

protected void checkTableUpdateByMergeInto(String tableName) {
writeToEmptyTableAndCheckData(tableName);

String sourceTableName = "source_table";
String selectClause =
"SELECT 1 AS id, '2' AS name, 2 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age";
String onClause = String.format("%s.id = %s.id", tableName, sourceTableName);
sql(getRowLevelUpdateTableSql(tableName, selectClause, sourceTableName, onClause));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(6, queryResult.size());
Assertions.assertEquals("1,2,2;2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult));
}

protected String getCreateSimpleTableString(String tableName) {
return getCreateSimpleTableString(tableName, false);
}
Expand Down Expand Up @@ -801,6 +894,16 @@ protected void checkTableColumns(
.check(tableInfo);
}

private void writeToEmptyTableAndCheckData(String tableName) {
sql(
String.format(
"INSERT INTO %s VALUES (1, '1', 1),(2, '2', 2),(3, '3', 3),(4, '4', 4),(5, '5', 5)",
tableName));
List<String> queryResult = getTableData(tableName);
Assertions.assertEquals(5, queryResult.size());
Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;5,5,5", String.join(";", queryResult));
}

// partition expression may contain "'", like a='s'/b=1
private String getPartitionExpression(SparkTableInfo table, String delimiter) {
return table.getPartitionedColumns().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ protected boolean supportsPartition() {
return true;
}

@Override
protected boolean supportsDelete() {
return false;
}

@Test
void testCreateHiveFormatPartitionTable() {
String tableName = "hive_partition_table";
Expand Down
Loading
Loading