Skip to content

Commit

Permalink
[apache#2543] feat(spark-connector): support row-level operations to …
Browse files Browse the repository at this point in the history
…iceberg Table (apache#3243)

### What changes were proposed in this pull request?

- refactor table implementation, make `SparkIcebergTable` extend Iceberg
`SparkTable`, and `SparkHiveTable` extend Kyuubi `HiveTable`.

- support row-level operations to iceberg Table

```
1. update tableName set c1=v1, c2=v2, ...

2. merge into targetTable t
   using sourceTable s
   on s.key=t.key
   when matched then ...
   when not matched then ...

3. delete from table where xxx
```

### Why are the changes needed?

1. For spark-connector in Iceberg, it explicitly uses `SparkTable` to
identify whether it is an Iceberg table, so the `SparkIcebergTable` must
extend `SparkTable`.

2. support row-level operations to iceberg Table.

Fix: apache#2543

### Does this PR introduce any user-facing change?
Yes, support update ... , merge into ..., delete from ...

### How was this patch tested?
New ITs.
  • Loading branch information
caican00 authored and diqiu50 committed Jun 13, 2024
1 parent fdfedac commit 2c1d6df
Show file tree
Hide file tree
Showing 18 changed files with 531 additions and 132 deletions.
25 changes: 20 additions & 5 deletions docs/spark-connector/spark-catalog-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This software is licensed under the Apache License version 2."

## Capabilities

#### Support basic DML and DDL operations:
#### Support DML and DDL operations:

- `CREATE TABLE`

Expand All @@ -18,13 +18,12 @@ Supports basic create table clause including table schema, properties, partition
- `ALTER TABLE`
- `INSERT INTO&OVERWRITE`
- `SELECT`
- `DELETE`
Supports file delete only.
- `MERGE INOT`
- `DELETE FROM`
- `UPDATE`

#### Not supported operations:

- Row level operations. like `MERGE INOT`, `DELETE FROM`, `UPDATE`
- View operations.
- Branching and tagging operations.
- Spark procedures.
Expand Down Expand Up @@ -57,6 +56,22 @@ VALUES
(3, 'Charlie', 'Sales', TIMESTAMP '2021-03-01 08:45:00');

SELECT * FROM employee WHERE date(hire_date) = '2021-01-01'

UPDATE employee SET department = 'Jenny' WHERE id = 1;

DELETE FROM employee WHERE id < 2;

MERGE INTO employee
USING (SELECT 4 as id, 'David' as name, 'Engineering' as department, TIMESTAMP '2021-04-01 09:00:00' as hire_date) as new_employee
ON employee.id = new_employee.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

MERGE INTO employee
USING (SELECT 4 as id, 'David' as name, 'Engineering' as department, TIMESTAMP '2021-04-01 09:00:00' as hire_date) as new_employee
ON employee.id = new_employee.id
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;
```

## Catalog properties
Expand Down
4 changes: 4 additions & 0 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ plugins {

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val kyuubiVersion: String = libs.versions.kyuubi.get()
val icebergVersion: String = libs.versions.iceberg.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

Expand Down Expand Up @@ -114,6 +116,8 @@ dependencies {
exclude("io.dropwizard.metrics")
exclude("org.rocksdb")
}
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")

testImplementation(libs.okhttp3.loginterceptor)
testImplementation(libs.postgresql.driver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,39 @@ protected static String getDeleteSql(String tableName, String condition) {
return String.format("DELETE FROM %s where %s", tableName, condition);
}

private static String getUpdateTableSql(String tableName, String setClause, String whereClause) {
return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
}

private static String getRowLevelUpdateTableSql(
String targetTableName, String selectClause, String sourceTableName, String onClause) {
return String.format(
"MERGE INTO %s "
+ "USING (%s) %s "
+ "ON %s "
+ "WHEN MATCHED THEN UPDATE SET * "
+ "WHEN NOT MATCHED THEN INSERT *",
targetTableName, selectClause, sourceTableName, onClause);
}

private static String getRowLevelDeleteTableSql(
String targetTableName, String selectClause, String sourceTableName, String onClause) {
return String.format(
"MERGE INTO %s "
+ "USING (%s) %s "
+ "ON %s "
+ "WHEN MATCHED THEN DELETE "
+ "WHEN NOT MATCHED THEN INSERT *",
targetTableName, selectClause, sourceTableName, onClause);
}

// Whether supports [CLUSTERED BY col_name3 SORTED BY col_name INTO num_buckets BUCKETS]
protected abstract boolean supportsSparkSQLClusteredBy();

protected abstract boolean supportsPartition();

protected abstract boolean supportsDelete();

// Use a custom database not the original default database because SparkCommonIT couldn't
// read&write data to tables in default database. The main reason is default database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address
Expand Down Expand Up @@ -702,6 +730,28 @@ void testTableOptions() {
checkTableReadWrite(tableInfo);
}

@Test
@EnabledIf("supportsDelete")
void testDeleteOperation() {
String tableName = "test_row_level_delete_table";
dropTableIfExists(tableName);
createSimpleTable(tableName);

SparkTableInfo table = getTableInfo(tableName);
checkTableColumns(tableName, getSimpleTableColumn(), table);
sql(
String.format(
"INSERT INTO %s VALUES (1, '1', 1),(2, '2', 2),(3, '3', 3),(4, '4', 4),(5, '5', 5)",
tableName));
List<String> queryResult1 = getTableData(tableName);
Assertions.assertEquals(5, queryResult1.size());
Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;5,5,5", String.join(";", queryResult1));
sql(getDeleteSql(tableName, "id <= 4"));
List<String> queryResult2 = getTableData(tableName);
Assertions.assertEquals(1, queryResult2.size());
Assertions.assertEquals("5,5,5", queryResult2.get(0));
}

protected void checkTableReadWrite(SparkTableInfo table) {
String name = table.getTableIdentifier();
boolean isPartitionTable = table.isPartitionTable();
Expand Down Expand Up @@ -760,6 +810,49 @@ protected String getExpectedTableData(SparkTableInfo table) {
.collect(Collectors.joining(","));
}

protected void checkRowLevelUpdate(String tableName) {
writeToEmptyTableAndCheckData(tableName);
String updatedValues = "id = 6, name = '6', age = 6";
sql(getUpdateTableSql(tableName, updatedValues, "id = 5"));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(5, queryResult.size());
Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;6,6,6", String.join(";", queryResult));
}

protected void checkRowLevelDelete(String tableName) {
writeToEmptyTableAndCheckData(tableName);
sql(getDeleteSql(tableName, "id <= 2"));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(3, queryResult.size());
Assertions.assertEquals("3,3,3;4,4,4;5,5,5", String.join(";", queryResult));
}

protected void checkDeleteByMergeInto(String tableName) {
writeToEmptyTableAndCheckData(tableName);

String sourceTableName = "source_table";
String selectClause =
"SELECT 1 AS id, '1' AS name, 1 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age";
String onClause = String.format("%s.id = %s.id", tableName, sourceTableName);
sql(getRowLevelDeleteTableSql(tableName, selectClause, sourceTableName, onClause));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(5, queryResult.size());
Assertions.assertEquals("2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult));
}

protected void checkTableUpdateByMergeInto(String tableName) {
writeToEmptyTableAndCheckData(tableName);

String sourceTableName = "source_table";
String selectClause =
"SELECT 1 AS id, '2' AS name, 2 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age";
String onClause = String.format("%s.id = %s.id", tableName, sourceTableName);
sql(getRowLevelUpdateTableSql(tableName, selectClause, sourceTableName, onClause));
List<String> queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id"));
Assertions.assertEquals(6, queryResult.size());
Assertions.assertEquals("1,2,2;2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult));
}

protected String getCreateSimpleTableString(String tableName) {
return getCreateSimpleTableString(tableName, false);
}
Expand Down Expand Up @@ -801,6 +894,16 @@ protected void checkTableColumns(
.check(tableInfo);
}

private void writeToEmptyTableAndCheckData(String tableName) {
sql(
String.format(
"INSERT INTO %s VALUES (1, '1', 1),(2, '2', 2),(3, '3', 3),(4, '4', 4),(5, '5', 5)",
tableName));
List<String> queryResult = getTableData(tableName);
Assertions.assertEquals(5, queryResult.size());
Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;5,5,5", String.join(";", queryResult));
}

// partition expression may contain "'", like a='s'/b=1
private String getPartitionExpression(SparkTableInfo table, String delimiter) {
return table.getPartitionedColumns().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ protected boolean supportsPartition() {
return true;
}

@Override
protected boolean supportsDelete() {
return false;
}

@Test
void testCreateHiveFormatPartitionTable() {
String tableName = "hive_partition_table";
Expand Down
Loading

0 comments on commit 2c1d6df

Please sign in to comment.