From ae715596ebc4fdad1ee1c25f44a62f1fcf9a633b Mon Sep 17 00:00:00 2001 From: caican00 Date: Wed, 1 May 2024 22:46:56 +0800 Subject: [PATCH 1/6] [#2543] feat(spark-connector): support row-level operations to iceberg Table --- integration-test/build.gradle.kts | 4 + .../integration/test/spark/SparkCommonIT.java | 103 +++++++++++++ .../test/spark/hive/SparkHiveCatalogIT.java | 5 + .../spark/iceberg/SparkIcebergCatalogIT.java | 139 ++++++++++++++++++ .../test/util/spark/SparkTableInfo.java | 42 +++++- .../test/util/spark/SparkUtilIT.java | 8 +- .../spark/connector/ConnectorConstants.java | 1 + .../spark/connector/catalog/BaseCatalog.java | 42 ++++-- .../connector/hive/GravitinoHiveCatalog.java | 13 +- .../spark/connector/hive/SparkHiveTable.java | 77 +++++++++- .../iceberg/GravitinoIcebergCatalog.java | 13 +- .../connector/iceberg/SparkIcebergTable.java | 106 +++++++++++-- .../plugin/GravitinoDriverPlugin.java | 21 ++- .../spark/connector/utils/ConnectorUtil.java | 26 ++++ .../SparkBaseTableHelper.java} | 72 ++++----- .../connector/utils/TestConnectorUtil.java | 31 ++++ 16 files changed, 605 insertions(+), 98 deletions(-) create mode 100644 spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java rename spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/{table/SparkBaseTable.java => utils/SparkBaseTableHelper.java} (79%) create mode 100644 spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 384f8417b18..95ce862da68 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -13,6 +13,8 @@ plugins { val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() val sparkVersion: String = libs.versions.spark.get() +val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") +val kyuubiVersion: String = libs.versions.kyuubi.get() val icebergVersion: String = libs.versions.iceberg.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() @@ -114,6 +116,8 @@ dependencies { exclude("io.dropwizard.metrics") exclude("org.rocksdb") } + testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") testImplementation(libs.okhttp3.loginterceptor) testImplementation(libs.postgresql.driver) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java index 9dab1b46839..498a245228f 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java @@ -68,11 +68,39 @@ protected static String getDeleteSql(String tableName, String condition) { return String.format("DELETE FROM %s where %s", tableName, condition); } + private static String getUpdateTableSql(String tableName, String setClause, String whereClause) { + return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause); + } + + private static String getRowLevelUpdateTableSql( + String targetTableName, String selectClause, String sourceTableName, String onClause) { + return String.format( + "MERGE INTO %s " + + "USING (%s) %s " + + "ON %s " + + "WHEN MATCHED THEN UPDATE SET * " + + "WHEN NOT MATCHED THEN INSERT *", + targetTableName, selectClause, sourceTableName, onClause); + } + + private static String getRowLevelDeleteTableSql( + String targetTableName, String selectClause, String sourceTableName, String onClause) { + return String.format( + "MERGE INTO %s " + + "USING (%s) %s " + + "ON %s " + + "WHEN MATCHED THEN DELETE " + + "WHEN NOT MATCHED THEN INSERT *", + targetTableName, selectClause, sourceTableName, onClause); + } + // Whether supports [CLUSTERED BY col_name3 SORTED BY col_name INTO num_buckets BUCKETS] protected abstract boolean supportsSparkSQLClusteredBy(); protected abstract boolean supportsPartition(); + protected abstract boolean supportsDelete(); + // Use a custom database not the original default database because SparkCommonIT couldn't // read&write data to tables in default database. The main reason is default database location is // determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address @@ -702,6 +730,28 @@ void testTableOptions() { checkTableReadWrite(tableInfo); } + @Test + @EnabledIf("supportsDelete") + void testDeleteOperation() { + String tableName = "test_row_level_delete_table"; + dropTableIfExists(tableName); + createSimpleTable(tableName); + + SparkTableInfo table = getTableInfo(tableName); + checkTableColumns(tableName, getSimpleTableColumn(), table); + sql( + String.format( + "INSERT INTO %s VALUES (1, '1', 1),(2, '2', 2),(3, '3', 3),(4, '4', 4),(5, '5', 5)", + tableName)); + List queryResult1 = getTableData(tableName); + Assertions.assertEquals(5, queryResult1.size()); + Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;5,5,5", String.join(";", queryResult1)); + sql(getDeleteSql(tableName, "id <= 4")); + List queryResult2 = getTableData(tableName); + Assertions.assertEquals(1, queryResult2.size()); + Assertions.assertEquals("5,5,5", queryResult2.get(0)); + } + protected void checkTableReadWrite(SparkTableInfo table) { String name = table.getTableIdentifier(); boolean isPartitionTable = table.isPartitionTable(); @@ -760,6 +810,49 @@ protected String getExpectedTableData(SparkTableInfo table) { .collect(Collectors.joining(",")); } + protected void checkTableRowLevelUpdate(String tableName) { + writeToEmptyTableAndCheckData(tableName); + String updatedValues = "id = 6, name = '6', age = 6"; + sql(getUpdateTableSql(tableName, updatedValues, "id = 5")); + List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName)); + Assertions.assertEquals(5, queryResult.size()); + Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;6,6,6", String.join(";", queryResult)); + } + + protected void checkTableRowLevelDelete(String tableName) { + writeToEmptyTableAndCheckData(tableName); + sql(getDeleteSql(tableName, "id <= 2")); + List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName)); + Assertions.assertEquals(3, queryResult.size()); + Assertions.assertEquals("3,3,3;4,4,4;5,5,5", String.join(";", queryResult)); + } + + protected void checkTableDeleteByMergeInto(String tableName) { + writeToEmptyTableAndCheckData(tableName); + + String sourceTableName = "source_table"; + String selectClause = + "SELECT 1 AS id, '1' AS name, 1 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age"; + String onClause = String.format("%s.id = %s.id", tableName, sourceTableName); + sql(getRowLevelDeleteTableSql(tableName, selectClause, sourceTableName, onClause)); + List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName)); + Assertions.assertEquals(5, queryResult.size()); + Assertions.assertEquals("2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult)); + } + + protected void checkTableUpdateByMergeInto(String tableName) { + writeToEmptyTableAndCheckData(tableName); + + String sourceTableName = "source_table"; + String selectClause = + "SELECT 1 AS id, '2' AS name, 2 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age"; + String onClause = String.format("%s.id = %s.id", tableName, sourceTableName); + sql(getRowLevelUpdateTableSql(tableName, selectClause, sourceTableName, onClause)); + List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName)); + Assertions.assertEquals(6, queryResult.size()); + Assertions.assertEquals("1,2,2;2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult)); + } + protected String getCreateSimpleTableString(String tableName) { return getCreateSimpleTableString(tableName, false); } @@ -801,6 +894,16 @@ protected void checkTableColumns( .check(tableInfo); } + private void writeToEmptyTableAndCheckData(String tableName) { + sql( + String.format( + "INSERT INTO %s VALUES (1, '1', 1),(2, '2', 2),(3, '3', 3),(4, '4', 4),(5, '5', 5)", + tableName)); + List queryResult = getTableData(tableName); + Assertions.assertEquals(5, queryResult.size()); + Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;5,5,5", String.join(";", queryResult)); + } + // partition expression may contain "'", like a='s'/b=1 private String getPartitionExpression(SparkTableInfo table, String delimiter) { return table.getPartitionedColumns().stream() diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java index 1f34c87c10f..f42e0332dd1 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/hive/SparkHiveCatalogIT.java @@ -55,6 +55,11 @@ protected boolean supportsPartition() { return true; } + @Override + protected boolean supportsDelete() { + return false; + } + @Test public void testCreateHiveFormatPartitionTable() { String tableName = "hive_partition_table"; diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index b94d6eb5e17..f7da5564809 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -9,6 +9,7 @@ import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo; import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.io.File; import java.util.ArrayList; import java.util.Arrays; @@ -18,10 +19,13 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; +import org.apache.spark.SparkConf; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -30,13 +34,21 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.internal.StaticSQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.platform.commons.util.StringUtils; +import scala.Tuple3; public abstract class SparkIcebergCatalogIT extends SparkCommonIT { + private static final String ICEBERG_FORMAT_VERSION = "format-version"; + private static final String ICEBERG_DELETE_MODE = "write.delete.mode"; + private static final String ICEBERG_UPDATE_MODE = "write.update.mode"; + private static final String ICEBERG_MERGE_MODE = "write.merge.mode"; + @Override protected String getCatalogName() { return "iceberg"; @@ -57,6 +69,11 @@ protected boolean supportsPartition() { return true; } + @Override + protected boolean supportsDelete() { + return true; + } + @Override protected String getTableLocation(SparkTableInfo table) { return String.join(File.separator, table.getTableLocation(), "data"); @@ -216,6 +233,24 @@ void testIcebergMetadataColumns() throws NoSuchTableException { testDeleteMetadataColumn(); } + @Test + void testInjectSparkExtensions() { + SparkSession sparkSession = getSparkSession(); + SparkConf conf = sparkSession.sparkContext().getConf(); + Assertions.assertTrue(conf.contains(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key())); + String extensions = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key()); + Assertions.assertTrue(StringUtils.isNotBlank(extensions)); + Assertions.assertEquals(IcebergSparkSessionExtensions.class.getName(), extensions); + } + + @Test + void testIcebergTableRowLevelOperations() { + testIcebergDeleteOperation(); + testIcebergUpdateOperation(); + testIcebergMergeIntoDeleteOperation(); + testIcebergMergeIntoUpdateOperation(); + } + private void testMetadataColumns() { String tableName = "test_metadata_columns"; dropTableIfExists(tableName); @@ -386,6 +421,88 @@ private void testDeleteMetadataColumn() { Assertions.assertEquals(0, queryResult1.size()); } + private void testIcebergDeleteOperation() { + getIcebergTablePropertyValues() + .forEach( + tuple -> { + String tableName = + String.format("test_iceberg_%s_%s_delete_operation", tuple._1(), tuple._2()); + dropTableIfExists(tableName); + createIcebergTableWithTabProperties( + tableName, + tuple._1(), + ImmutableMap.of( + ICEBERG_FORMAT_VERSION, + String.valueOf(tuple._2()), + ICEBERG_DELETE_MODE, + tuple._3())); + checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); + checkTableRowLevelDelete(tableName); + }); + } + + private void testIcebergUpdateOperation() { + getIcebergTablePropertyValues() + .forEach( + tuple -> { + String tableName = + String.format("test_iceberg_%s_%s_update_operation", tuple._1(), tuple._2()); + dropTableIfExists(tableName); + createIcebergTableWithTabProperties( + tableName, + tuple._1(), + ImmutableMap.of( + ICEBERG_FORMAT_VERSION, + String.valueOf(tuple._2()), + ICEBERG_UPDATE_MODE, + tuple._3())); + checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); + checkTableRowLevelUpdate(tableName); + }); + } + + private void testIcebergMergeIntoDeleteOperation() { + getIcebergTablePropertyValues() + .forEach( + tuple -> { + String tableName = + String.format( + "test_iceberg_%s_%s_mergeinto_delete_operation", tuple._1(), tuple._2()); + dropTableIfExists(tableName); + createIcebergTableWithTabProperties( + tableName, + tuple._1(), + ImmutableMap.of( + ICEBERG_FORMAT_VERSION, + String.valueOf(tuple._2()), + ICEBERG_MERGE_MODE, + tuple._3())); + checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); + checkTableDeleteByMergeInto(tableName); + }); + } + + private void testIcebergMergeIntoUpdateOperation() { + getIcebergTablePropertyValues() + .forEach( + tuple -> { + String tableName = + String.format( + "test_iceberg_%s_%s_mergeinto_update_operation", tuple._1(), tuple._2()); + dropTableIfExists(tableName); + createIcebergTableWithTabProperties( + tableName, + tuple._1(), + ImmutableMap.of( + ICEBERG_FORMAT_VERSION, + String.valueOf(tuple._2()), + ICEBERG_MERGE_MODE, + tuple._3())); + checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); + checkTableUpdateByMergeInto(tableName); + }); + } + private List getIcebergSimpleTableColumn() { return Arrays.asList( SparkTableInfo.SparkColumnInfo.of("id", DataTypes.IntegerType, "id comment"), @@ -416,4 +533,26 @@ private SparkMetadataColumnInfo[] getIcebergMetadataColumns() { new SparkMetadataColumnInfo("_deleted", DataTypes.BooleanType, false) }; } + + private List> getIcebergTablePropertyValues() { + return Arrays.asList( + new Tuple3<>(false, 1, "copy-on-write"), + new Tuple3<>(false, 2, "merge-on-read"), + new Tuple3<>(true, 1, "copy-on-write"), + new Tuple3<>(true, 2, "merge-on-read")); + } + + private void createIcebergTableWithTabProperties( + String tableName, boolean isPartitioned, ImmutableMap tblProperties) { + String partitionedClause = isPartitioned ? " PARTITIONED BY (name) " : ""; + String tblPropertiesStr = + tblProperties.entrySet().stream() + .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(",")); + String createSql = + String.format( + "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', age INT) %s TBLPROPERTIES(%s)", + tableName, partitionedClause, tblPropertiesStr); + sql(createSql); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java index ee08de46ee9..43d3b85adfb 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java @@ -6,7 +6,9 @@ package com.datastrato.gravitino.integration.test.util.spark; import com.datastrato.gravitino.spark.connector.ConnectorConstants; -import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; +import com.datastrato.gravitino.spark.connector.SparkTransformConverter; +import com.datastrato.gravitino.spark.connector.hive.SparkHiveTable; +import com.datastrato.gravitino.spark.connector.iceberg.SparkIcebergTable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -18,6 +20,7 @@ import lombok.Data; import org.apache.commons.lang3.StringUtils; import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; +import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.ApplyTransform; import org.apache.spark.sql.connector.expressions.BucketTransform; @@ -29,6 +32,7 @@ import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.connector.expressions.YearsTransform; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Assertions; /** SparkTableInfo is used to check the result in test. */ @@ -89,7 +93,7 @@ void addPartition(Transform partition) { } } - static SparkTableInfo create(SparkBaseTable baseTable) { + static SparkTableInfo create(Table baseTable) { SparkTableInfo sparkTableInfo = new SparkTableInfo(); String identifier = baseTable.name(); String[] items = identifier.split("\\."); @@ -98,7 +102,7 @@ static SparkTableInfo create(SparkBaseTable baseTable) { sparkTableInfo.tableName = items[1]; sparkTableInfo.database = items[0]; sparkTableInfo.columns = - Arrays.stream(baseTable.schema().fields()) + Arrays.stream(getSchema(baseTable).fields()) .map( sparkField -> new SparkColumnInfo( @@ -110,7 +114,7 @@ static SparkTableInfo create(SparkBaseTable baseTable) { sparkTableInfo.comment = baseTable.properties().remove(ConnectorConstants.COMMENT); sparkTableInfo.tableProperties = baseTable.properties(); boolean supportsBucketPartition = - baseTable.getSparkTransformConverter().isSupportsBucketPartition(); + getSparkTransformConverter(baseTable).isSupportsBucketPartition(); Arrays.stream(baseTable.partitioning()) .forEach( transform -> { @@ -149,10 +153,6 @@ static SparkTableInfo create(SparkBaseTable baseTable) { return sparkTableInfo; } - private static boolean isBucketPartition(boolean supportsBucketPartition, Transform transform) { - return supportsBucketPartition && !(transform instanceof SortedBucketTransform); - } - public List getUnPartitionedColumns() { return columns.stream() .filter(column -> !partitionColumnNames.contains(column.name)) @@ -165,6 +165,32 @@ public List getPartitionedColumns() { .collect(Collectors.toList()); } + private static boolean isBucketPartition(boolean supportsBucketPartition, Transform transform) { + return supportsBucketPartition && !(transform instanceof SortedBucketTransform); + } + + private static SparkTransformConverter getSparkTransformConverter(Table baseTable) { + if (baseTable instanceof SparkHiveTable) { + return ((SparkHiveTable) baseTable).getSparkTransformConverter(); + } else if (baseTable instanceof SparkIcebergTable) { + return ((SparkIcebergTable) baseTable).getSparkTransformConverter(); + } else { + throw new IllegalArgumentException( + "Doesn't support Spark table: " + baseTable.getClass().getName()); + } + } + + private static StructType getSchema(Table baseTable) { + if (baseTable instanceof SparkHiveTable) { + return ((SparkHiveTable) baseTable).schema(); + } else if (baseTable instanceof SparkIcebergTable) { + return ((SparkIcebergTable) baseTable).schema(); + } else { + throw new IllegalArgumentException( + "Doesn't support Spark table: " + baseTable.getClass().getName()); + } + } + @Data public static class SparkColumnInfo { private String name; diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java index 2603fbe8f73..bad6fa0cb62 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java @@ -20,7 +20,6 @@ package com.datastrato.gravitino.integration.test.util.spark; import com.datastrato.gravitino.integration.test.util.AbstractIT; -import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Arrays; @@ -130,8 +129,7 @@ protected SparkTableInfo getTableInfo(String tableName) { CommandResult result = (CommandResult) ds.logicalPlan(); DescribeRelation relation = (DescribeRelation) result.commandLogicalPlan(); ResolvedTable table = (ResolvedTable) relation.child(); - SparkBaseTable baseTable = (SparkBaseTable) table.table(); - return SparkTableInfo.create(baseTable); + return SparkTableInfo.create(table.table()); } protected void dropTableIfExists(String tableName) { @@ -159,6 +157,10 @@ protected void insertTableAsSelect(String tableName, String newName) { sql(String.format("INSERT INTO TABLE %s SELECT * FROM %s", newName, tableName)); } + protected static String getSelectAllSqlWithOrder(String tableName) { + return String.format("SELECT * FROM %s ORDER BY id", tableName); + } + private static String getSelectAllSql(String tableName) { return String.format("SELECT * FROM %s", tableName); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/ConnectorConstants.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/ConnectorConstants.java index 3a49a21470f..9758ff42196 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/ConnectorConstants.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/ConnectorConstants.java @@ -14,6 +14,7 @@ public class ConnectorConstants { public static final String LOCATION = "location"; public static final String DOT = "."; + public static final String COMMA = ","; private ConnectorConstants() {} } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java index f5994b4ce86..1cfc98de6ef 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java @@ -19,7 +19,6 @@ import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter.DistributionAndSortOrdersInfo; import com.datastrato.gravitino.spark.connector.SparkTypeConverter; -import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Arrays; @@ -27,6 +26,7 @@ import java.util.Map; import java.util.Optional; import javax.ws.rs.NotSupportedException; +import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -93,15 +93,17 @@ protected abstract TableCatalog createAndInitSparkCatalog( * * @param identifier Spark's table identifier * @param gravitinoTable Gravitino table to do DDL operations + * @param sparkTable specific Spark table to do IO operations * @param sparkCatalog specific Spark catalog to do IO operations * @param propertiesConverter transform properties between Gravitino and Spark * @param sparkTransformConverter sparkTransformConverter convert transforms between Gravitino and * Spark * @return a specific Spark table */ - protected abstract SparkBaseTable createSparkTable( + protected abstract Table createSparkTable( Identifier identifier, com.datastrato.gravitino.rel.Table gravitinoTable, + Table sparkTable, TableCatalog sparkCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter); @@ -162,10 +164,10 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti } } + @SneakyThrows @Override public Table createTable( - Identifier ident, Column[] columns, Transform[] transforms, Map properties) - throws TableAlreadyExistsException, NoSuchNamespaceException { + Identifier ident, Column[] columns, Transform[] transforms, Map properties) { NameIdentifier gravitinoIdentifier = NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()); com.datastrato.gravitino.rel.Column[] gravitinoColumns = @@ -184,7 +186,7 @@ public Table createTable( sparkTransformConverter.toGravitinoPartitionings(transforms); try { - com.datastrato.gravitino.rel.Table table = + com.datastrato.gravitino.rel.Table gravitinoTable = gravitinoCatalogClient .asTableCatalog() .createTable( @@ -195,12 +197,20 @@ public Table createTable( partitionings, distributionAndSortOrdersInfo.getDistribution(), distributionAndSortOrdersInfo.getSortOrders()); + Table sparkTable = sparkCatalog.loadTable(ident); return createSparkTable( - ident, table, sparkCatalog, propertiesConverter, sparkTransformConverter); + ident, + gravitinoTable, + sparkTable, + sparkCatalog, + propertiesConverter, + sparkTransformConverter); } catch (NoSuchSchemaException e) { throw new NoSuchNamespaceException(ident.namespace()); } catch (com.datastrato.gravitino.exceptions.TableAlreadyExistsException e) { throw new TableAlreadyExistsException(ident); + } catch (NoSuchTableException e) { + throw new NoSuchTableException(ident); } } @@ -208,13 +218,19 @@ public Table createTable( public Table loadTable(Identifier ident) throws NoSuchTableException { try { String database = getDatabase(ident); - com.datastrato.gravitino.rel.Table table = + com.datastrato.gravitino.rel.Table gravitinoTable = gravitinoCatalogClient .asTableCatalog() .loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name())); + Table sparkTable = sparkCatalog.loadTable(ident); // Will create a catalog specific table return createSparkTable( - ident, table, sparkCatalog, propertiesConverter, sparkTransformConverter); + ident, + gravitinoTable, + sparkTable, + sparkCatalog, + propertiesConverter, + sparkTransformConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -235,14 +251,20 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT .map(BaseCatalog::transformTableChange) .toArray(com.datastrato.gravitino.rel.TableChange[]::new); try { - com.datastrato.gravitino.rel.Table table = + com.datastrato.gravitino.rel.Table gravitinoTable = gravitinoCatalogClient .asTableCatalog() .alterTable( NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()), gravitinoTableChanges); + Table sparkTable = sparkCatalog.loadTable(ident); return createSparkTable( - ident, table, sparkCatalog, propertiesConverter, sparkTransformConverter); + ident, + gravitinoTable, + sparkTable, + sparkCatalog, + propertiesConverter, + sparkTransformConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java index 6ffca1ff9f4..a1cefdaf3a9 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java @@ -10,7 +10,6 @@ import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog; -import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Map; @@ -42,14 +41,20 @@ protected TableCatalog createAndInitSparkCatalog( } @Override - protected SparkBaseTable createSparkTable( + protected org.apache.spark.sql.connector.catalog.Table createSparkTable( Identifier identifier, Table gravitinoTable, - TableCatalog sparkCatalog, + org.apache.spark.sql.connector.catalog.Table sparkTable, + TableCatalog sparkHiveCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { return new SparkHiveTable( - identifier, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); + identifier, + gravitinoTable, + sparkTable, + sparkHiveCatalog, + propertiesConverter, + sparkTransformConverter); } @Override diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java index 91f9468178b..48c88f1262d 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java @@ -8,23 +8,86 @@ import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; -import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; +import com.datastrato.gravitino.spark.connector.utils.SparkBaseTableHelper; +import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import java.util.Set; +import org.apache.kyuubi.spark.connector.hive.HiveTable; +import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** Keep consistent behavior with the SparkIcebergTable */ +public class SparkHiveTable extends HiveTable { + + private SparkBaseTableHelper sparkBaseTableHelper; -/** May support more capabilities like partition management. */ -public class SparkHiveTable extends SparkBaseTable { public SparkHiveTable( Identifier identifier, Table gravitinoTable, - TableCatalog sparkCatalog, + org.apache.spark.sql.connector.catalog.Table sparkHiveTable, + TableCatalog sparkHiveCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { - super(identifier, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); + super( + SparkSession.active(), + ((HiveTable) sparkHiveTable).catalogTable(), + (HiveTableCatalog) sparkHiveCatalog); + this.sparkBaseTableHelper = + new SparkBaseTableHelper( + identifier, + gravitinoTable, + sparkHiveTable, + propertiesConverter, + sparkTransformConverter); + } + + @Override + public String name() { + return sparkBaseTableHelper.name(false); + } + + @Override + @SuppressWarnings("deprecation") + public StructType schema() { + return sparkBaseTableHelper.schema(); + } + + @Override + public Map properties() { + return sparkBaseTableHelper.properties(); } @Override - protected boolean isCaseSensitive() { - return false; + public Transform[] partitioning() { + return sparkBaseTableHelper.partitioning(); + } + + @Override + public Set capabilities() { + return sparkBaseTableHelper.capabilities(); + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return sparkBaseTableHelper.newScanBuilder(options); + } + + @Override + public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { + return sparkBaseTableHelper.newWriteBuilder(info); + } + + @VisibleForTesting + public SparkTransformConverter getSparkTransformConverter() { + return sparkBaseTableHelper.getSparkTransformConverter(); } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java index f7a028cad7a..5355dbc3dfd 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java @@ -9,7 +9,6 @@ import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog; -import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Locale; @@ -66,14 +65,20 @@ protected TableCatalog createAndInitSparkCatalog( } @Override - protected SparkBaseTable createSparkTable( + protected org.apache.spark.sql.connector.catalog.Table createSparkTable( Identifier identifier, Table gravitinoTable, - TableCatalog sparkCatalog, + org.apache.spark.sql.connector.catalog.Table sparkTable, + TableCatalog sparkIcebergCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { return new SparkIcebergTable( - identifier, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); + identifier, + gravitinoTable, + sparkTable, + sparkIcebergCatalog, + propertiesConverter, + sparkTransformConverter); } @Override diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java index 22dd0bb73a8..14afb7ef136 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java @@ -8,43 +8,125 @@ import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; -import com.datastrato.gravitino.spark.connector.table.SparkBaseTable; +import com.datastrato.gravitino.spark.connector.utils.SparkBaseTableHelper; +import com.google.common.annotations.VisibleForTesting; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.MetadataColumn; import org.apache.spark.sql.connector.catalog.SupportsDelete; import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; +import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations; +import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.RowLevelOperationBuilder; +import org.apache.spark.sql.connector.write.RowLevelOperationInfo; +import org.apache.spark.sql.connector.write.WriteBuilder; import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; -public class SparkIcebergTable extends SparkBaseTable - implements SupportsDelete, SupportsMetadataColumns { +/** + * For spark-connector in Iceberg, it explicitly uses SparkTable to identify whether it is an + * Iceberg table, so the SparkIcebergTable must extend SparkTable. + */ +public class SparkIcebergTable extends SparkTable { + + private SparkBaseTableHelper sparkBaseTableHelper; public SparkIcebergTable( Identifier identifier, Table gravitinoTable, + org.apache.spark.sql.connector.catalog.Table sparkIcebergTable, TableCatalog sparkIcebergCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { - super( - identifier, - gravitinoTable, - sparkIcebergCatalog, - propertiesConverter, - sparkTransformConverter); + super(((SparkTable) sparkIcebergTable).table(), !isCacheEnabled(sparkIcebergCatalog)); + this.sparkBaseTableHelper = + new SparkBaseTableHelper( + identifier, + gravitinoTable, + sparkIcebergTable, + propertiesConverter, + sparkTransformConverter); + } + + @Override + public String name() { + return sparkBaseTableHelper.name(true); + } + + @Override + @SuppressWarnings("deprecation") + public StructType schema() { + return sparkBaseTableHelper.schema(); + } + + @Override + public Map properties() { + return sparkBaseTableHelper.properties(); + } + + @Override + public Transform[] partitioning() { + return sparkBaseTableHelper.partitioning(); + } + + @Override + public Set capabilities() { + return sparkBaseTableHelper.capabilities(); + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return sparkBaseTableHelper.newScanBuilder(options); + } + + @Override + public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { + return sparkBaseTableHelper.newWriteBuilder(info); } @Override public boolean canDeleteWhere(Filter[] filters) { - return ((SupportsDelete) getSparkTable()).canDeleteWhere(filters); + return ((SupportsDelete) sparkBaseTableHelper.getSparkTable()).canDeleteWhere(filters); } @Override public void deleteWhere(Filter[] filters) { - ((SupportsDelete) getSparkTable()).deleteWhere(filters); + ((SupportsDelete) sparkBaseTableHelper.getSparkTable()).deleteWhere(filters); } @Override public MetadataColumn[] metadataColumns() { - return ((SupportsMetadataColumns) getSparkTable()).metadataColumns(); + return ((SupportsMetadataColumns) sparkBaseTableHelper.getSparkTable()).metadataColumns(); + } + + @Override + public RowLevelOperationBuilder newRowLevelOperationBuilder(RowLevelOperationInfo info) { + return ((SupportsRowLevelOperations) sparkBaseTableHelper.getSparkTable()) + .newRowLevelOperationBuilder(info); + } + + @VisibleForTesting + public SparkTransformConverter getSparkTransformConverter() { + return sparkBaseTableHelper.getSparkTransformConverter(); + } + + private static boolean isCacheEnabled(TableCatalog sparkIcebergCatalog) { + try { + SparkCatalog catalog = ((SparkCatalog) sparkIcebergCatalog); + Field cacheEnabled = catalog.getClass().getDeclaredField("cacheEnabled"); + cacheEnabled.setAccessible(true); + return cacheEnabled.getBoolean(catalog); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Failed to get cacheEnabled field from SparkCatalog", e); + } } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java index 3f830de2cdc..201666cc004 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java @@ -5,6 +5,8 @@ package com.datastrato.gravitino.spark.connector.plugin; +import static com.datastrato.gravitino.spark.connector.utils.ConnectorUtil.removeDuplicates; + import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig; import com.datastrato.gravitino.spark.connector.catalog.GravitinoCatalogManager; @@ -15,10 +17,12 @@ import java.util.Locale; import java.util.Map; import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.plugin.DriverPlugin; import org.apache.spark.api.plugin.PluginContext; +import org.apache.spark.sql.internal.StaticSQLConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +34,8 @@ public class GravitinoDriverPlugin implements DriverPlugin { private static final Logger LOG = LoggerFactory.getLogger(GravitinoDriverPlugin.class); private GravitinoCatalogManager catalogManager; + private static final String[] GRAVITINO_DRIVER_EXTENSIONS = + new String[] {IcebergSparkSessionExtensions.class.getName()}; @Override public Map init(SparkContext sc, PluginContext pluginContext) { @@ -48,7 +54,7 @@ public Map init(SparkContext sc, PluginContext pluginContext) { catalogManager = GravitinoCatalogManager.create(gravitinoUri, metalake); catalogManager.loadRelationalCatalogs(); registerGravitinoCatalogs(conf, catalogManager.getCatalogs()); - registerSqlExtensions(); + registerSqlExtensions(conf); return Collections.emptyMap(); } @@ -103,6 +109,15 @@ private void registerCatalog(SparkConf sparkConf, String catalogName, String pro LOG.info("Register {} catalog to Spark catalog manager.", catalogName); } - // Todo inject Iceberg extensions - private void registerSqlExtensions() {} + private void registerSqlExtensions(SparkConf conf) { + String gravitinoDriverExtensions = String.join(",", GRAVITINO_DRIVER_EXTENSIONS); + if (conf.contains(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key())) { + String sparkSessionExtensions = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key()); + conf.set( + StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), + removeDuplicates(GRAVITINO_DRIVER_EXTENSIONS, sparkSessionExtensions)); + } else { + conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), gravitinoDriverExtensions); + } + } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java new file mode 100644 index 00000000000..eeaa56c9da2 --- /dev/null +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.utils; + +import static com.datastrato.gravitino.spark.connector.ConnectorConstants.COMMA; + +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; + +public class ConnectorUtil { + + public static String removeDuplicates(String[] elements, String otherElements) { + Set uniqueElements = new LinkedHashSet<>(Arrays.asList(elements)); + if (StringUtils.isNotBlank(otherElements)) { + uniqueElements.addAll(Arrays.asList(otherElements.split(COMMA))); + } + return uniqueElements.stream() + .reduce((element1, element2) -> element1 + COMMA + element2) + .orElse(""); + } +} diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/SparkBaseTableHelper.java similarity index 79% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java rename to spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/SparkBaseTableHelper.java index d1333135f19..461ce49384a 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/SparkBaseTableHelper.java @@ -3,7 +3,7 @@ * This software is licensed under the Apache License version 2. */ -package com.datastrato.gravitino.spark.connector.table; +package com.datastrato.gravitino.spark.connector.utils; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; @@ -11,7 +11,6 @@ import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.SparkTypeConverter; -import com.google.common.annotations.VisibleForTesting; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -19,13 +18,11 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCapability; -import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.connector.write.LogicalWriteInfo; @@ -42,34 +39,31 @@ * could implement more capabilities like SupportsPartitionManagement for Hive table, SupportsIndex * for JDBC table, SupportsRowLevelOperations for Iceberg table. */ -public abstract class SparkBaseTable implements Table, SupportsRead, SupportsWrite { +public class SparkBaseTableHelper { + private Identifier identifier; private com.datastrato.gravitino.rel.Table gravitinoTable; - private TableCatalog sparkCatalog; - private Table lazySparkTable; + private Table sparkTable; private PropertiesConverter propertiesConverter; private SparkTransformConverter sparkTransformConverter; - public SparkBaseTable( + public SparkBaseTableHelper( Identifier identifier, com.datastrato.gravitino.rel.Table gravitinoTable, - TableCatalog sparkCatalog, + Table sparkTable, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { this.identifier = identifier; this.gravitinoTable = gravitinoTable; - this.sparkCatalog = sparkCatalog; + this.sparkTable = sparkTable; this.propertiesConverter = propertiesConverter; this.sparkTransformConverter = sparkTransformConverter; } - @Override - public String name() { - return getNormalizedIdentifier(identifier, gravitinoTable.name()); + public String name(boolean isCaseSensitive) { + return getNormalizedIdentifier(identifier, gravitinoTable.name(), isCaseSensitive); } - @Override - @SuppressWarnings("deprecation") public StructType schema() { List structs = Arrays.stream(gravitinoTable.columns()) @@ -93,7 +87,6 @@ public StructType schema() { return DataTypes.createStructType(structs); } - @Override public Map properties() { Map properties = new HashMap(); if (gravitinoTable.properties() != null) { @@ -110,59 +103,44 @@ public Map properties() { return properties; } - @Override + public Transform[] partitioning() { + com.datastrato.gravitino.rel.expressions.transforms.Transform[] partitions = + gravitinoTable.partitioning(); + Distribution distribution = gravitinoTable.distribution(); + SortOrder[] sortOrders = gravitinoTable.sortOrder(); + return sparkTransformConverter.toSparkTransform(partitions, distribution, sortOrders); + } + public Set capabilities() { - return getSparkTable().capabilities(); + return sparkTable.capabilities(); } - @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - return ((SupportsRead) getSparkTable()).newScanBuilder(options); + return ((SupportsRead) sparkTable).newScanBuilder(options); } - @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { - return ((SupportsWrite) getSparkTable()).newWriteBuilder(info); - } - - @Override - public Transform[] partitioning() { - com.datastrato.gravitino.rel.expressions.transforms.Transform[] partitions = - gravitinoTable.partitioning(); - Distribution distribution = gravitinoTable.distribution(); - SortOrder[] sortOrders = gravitinoTable.sortOrder(); - return sparkTransformConverter.toSparkTransform(partitions, distribution, sortOrders); + return ((SupportsWrite) sparkTable).newWriteBuilder(info); } - protected Table getSparkTable() { - if (lazySparkTable == null) { - try { - this.lazySparkTable = sparkCatalog.loadTable(identifier); - } catch (NoSuchTableException e) { - throw new RuntimeException(e); - } - } - return lazySparkTable; + public Table getSparkTable() { + return sparkTable; } - @VisibleForTesting public SparkTransformConverter getSparkTransformConverter() { return sparkTransformConverter; } - protected boolean isCaseSensitive() { - return true; - } - // The underlying catalogs may not case-sensitive, to keep consistent with the action of SparkSQL, // we should return normalized identifiers. - private String getNormalizedIdentifier(Identifier tableIdentifier, String gravitinoTableName) { + private String getNormalizedIdentifier( + Identifier tableIdentifier, String gravitinoTableName, boolean isCaseSensitive) { if (tableIdentifier.namespace().length == 0) { return gravitinoTableName; } String databaseName = tableIdentifier.namespace()[0]; - if (isCaseSensitive() == false) { + if (!isCaseSensitive) { databaseName = databaseName.toLowerCase(Locale.ROOT); } diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java new file mode 100644 index 00000000000..81d452d28e8 --- /dev/null +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.utils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class TestConnectorUtil { + + @Test + void testRemoveDuplicates() { + String[] elements = {"a", "b", "c"}; + String otherElements = "a,d,e"; + String result = ConnectorUtil.removeDuplicates(elements, otherElements); + Assertions.assertEquals(result, "a,b,c,d,e"); + + elements = new String[] {"a", "a", "b", "c"}; + otherElements = ""; + result = ConnectorUtil.removeDuplicates(elements, otherElements); + Assertions.assertEquals(result, "a,b,c"); + + elements = new String[] {"a", "a", "b", "c"}; + result = ConnectorUtil.removeDuplicates(elements, null); + Assertions.assertEquals(result, "a,b,c"); + } +} From 3f4d6b620b9ceb84c88a916e6374dc776e2f9378 Mon Sep 17 00:00:00 2001 From: caican00 Date: Tue, 7 May 2024 17:10:23 +0800 Subject: [PATCH 2/6] update --- .../integration/test/spark/SparkCommonIT.java | 14 +- .../spark/iceberg/SparkIcebergCatalogIT.java | 206 +++++++++--------- .../test/util/spark/SparkTableInfo.java | 2 + .../test/util/spark/SparkUtilIT.java | 4 +- .../spark/connector/catalog/BaseCatalog.java | 18 +- .../spark/connector/hive/SparkHiveTable.java | 38 ++-- .../connector/iceberg/SparkIcebergTable.java | 47 ++-- .../plugin/GravitinoDriverPlugin.java | 16 +- .../spark/connector/utils/ConnectorUtil.java | 10 +- ...per.java => GravitinoTableInfoHelper.java} | 37 +--- .../connector/utils/TestConnectorUtil.java | 24 +- 11 files changed, 206 insertions(+), 210 deletions(-) rename spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/{SparkBaseTableHelper.java => GravitinoTableInfoHelper.java} (76%) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java index 498a245228f..bd44fd33374 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkCommonIT.java @@ -810,24 +810,24 @@ protected String getExpectedTableData(SparkTableInfo table) { .collect(Collectors.joining(",")); } - protected void checkTableRowLevelUpdate(String tableName) { + protected void checkRowLevelUpdate(String tableName) { writeToEmptyTableAndCheckData(tableName); String updatedValues = "id = 6, name = '6', age = 6"; sql(getUpdateTableSql(tableName, updatedValues, "id = 5")); - List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName)); + List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id")); Assertions.assertEquals(5, queryResult.size()); Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;6,6,6", String.join(";", queryResult)); } - protected void checkTableRowLevelDelete(String tableName) { + protected void checkRowLevelDelete(String tableName) { writeToEmptyTableAndCheckData(tableName); sql(getDeleteSql(tableName, "id <= 2")); - List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName)); + List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id")); Assertions.assertEquals(3, queryResult.size()); Assertions.assertEquals("3,3,3;4,4,4;5,5,5", String.join(";", queryResult)); } - protected void checkTableDeleteByMergeInto(String tableName) { + protected void checkDeleteByMergeInto(String tableName) { writeToEmptyTableAndCheckData(tableName); String sourceTableName = "source_table"; @@ -835,7 +835,7 @@ protected void checkTableDeleteByMergeInto(String tableName) { "SELECT 1 AS id, '1' AS name, 1 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age"; String onClause = String.format("%s.id = %s.id", tableName, sourceTableName); sql(getRowLevelDeleteTableSql(tableName, selectClause, sourceTableName, onClause)); - List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName)); + List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id")); Assertions.assertEquals(5, queryResult.size()); Assertions.assertEquals("2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult)); } @@ -848,7 +848,7 @@ protected void checkTableUpdateByMergeInto(String tableName) { "SELECT 1 AS id, '2' AS name, 2 AS age UNION ALL SELECT 6 AS id, '6' AS name, 6 AS age"; String onClause = String.format("%s.id = %s.id", tableName, sourceTableName); sql(getRowLevelUpdateTableSql(tableName, selectClause, sourceTableName, onClause)); - List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName)); + List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id")); Assertions.assertEquals(6, queryResult.size()); Assertions.assertEquals("1,2,2;2,2,2;3,3,3;4,4,4;5,5,5;6,6,6", String.join(";", queryResult)); } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index f7da5564809..a757db1be5d 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -18,14 +18,12 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import lombok.Data; import org.apache.hadoop.fs.Path; -import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; -import org.apache.spark.SparkConf; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -34,13 +32,12 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; -import org.apache.spark.sql.internal.StaticSQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.junit.platform.commons.util.StringUtils; -import scala.Tuple3; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; public abstract class SparkIcebergCatalogIT extends SparkCommonIT { @@ -233,22 +230,13 @@ void testIcebergMetadataColumns() throws NoSuchTableException { testDeleteMetadataColumn(); } - @Test - void testInjectSparkExtensions() { - SparkSession sparkSession = getSparkSession(); - SparkConf conf = sparkSession.sparkContext().getConf(); - Assertions.assertTrue(conf.contains(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key())); - String extensions = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key()); - Assertions.assertTrue(StringUtils.isNotBlank(extensions)); - Assertions.assertEquals(IcebergSparkSessionExtensions.class.getName(), extensions); - } - - @Test - void testIcebergTableRowLevelOperations() { - testIcebergDeleteOperation(); - testIcebergUpdateOperation(); - testIcebergMergeIntoDeleteOperation(); - testIcebergMergeIntoUpdateOperation(); + @ParameterizedTest + @MethodSource("getIcebergTablePropertyValues") + void testIcebergTableRowLevelOperations(IcebergTableWriteProperties icebergTableWriteProperties) { + testIcebergDeleteOperation(icebergTableWriteProperties); + testIcebergUpdateOperation(icebergTableWriteProperties); + testIcebergMergeIntoDeleteOperation(icebergTableWriteProperties); + testIcebergMergeIntoUpdateOperation(icebergTableWriteProperties); } private void testMetadataColumns() { @@ -421,86 +409,82 @@ private void testDeleteMetadataColumn() { Assertions.assertEquals(0, queryResult1.size()); } - private void testIcebergDeleteOperation() { - getIcebergTablePropertyValues() - .forEach( - tuple -> { - String tableName = - String.format("test_iceberg_%s_%s_delete_operation", tuple._1(), tuple._2()); - dropTableIfExists(tableName); - createIcebergTableWithTabProperties( - tableName, - tuple._1(), - ImmutableMap.of( - ICEBERG_FORMAT_VERSION, - String.valueOf(tuple._2()), - ICEBERG_DELETE_MODE, - tuple._3())); - checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); - checkTableRowLevelDelete(tableName); - }); + private void testIcebergDeleteOperation(IcebergTableWriteProperties icebergTableWriteProperties) { + String tableName = + String.format( + "test_iceberg_%s_%s_delete_operation", + icebergTableWriteProperties.isPartitionedTable, + icebergTableWriteProperties.formatVersion); + dropTableIfExists(tableName); + createIcebergTableWithTabProperties( + tableName, + icebergTableWriteProperties.isPartitionedTable, + ImmutableMap.of( + ICEBERG_FORMAT_VERSION, + String.valueOf(icebergTableWriteProperties.formatVersion), + ICEBERG_DELETE_MODE, + icebergTableWriteProperties.writeMode)); + checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); + checkRowLevelDelete(tableName); } - private void testIcebergUpdateOperation() { - getIcebergTablePropertyValues() - .forEach( - tuple -> { - String tableName = - String.format("test_iceberg_%s_%s_update_operation", tuple._1(), tuple._2()); - dropTableIfExists(tableName); - createIcebergTableWithTabProperties( - tableName, - tuple._1(), - ImmutableMap.of( - ICEBERG_FORMAT_VERSION, - String.valueOf(tuple._2()), - ICEBERG_UPDATE_MODE, - tuple._3())); - checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); - checkTableRowLevelUpdate(tableName); - }); + private void testIcebergUpdateOperation(IcebergTableWriteProperties icebergTableWriteProperties) { + String tableName = + String.format( + "test_iceberg_%s_%s_update_operation", + icebergTableWriteProperties.isPartitionedTable, + icebergTableWriteProperties.formatVersion); + dropTableIfExists(tableName); + createIcebergTableWithTabProperties( + tableName, + icebergTableWriteProperties.isPartitionedTable, + ImmutableMap.of( + ICEBERG_FORMAT_VERSION, + String.valueOf(icebergTableWriteProperties.formatVersion), + ICEBERG_UPDATE_MODE, + icebergTableWriteProperties.writeMode)); + checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); + checkRowLevelUpdate(tableName); } - private void testIcebergMergeIntoDeleteOperation() { - getIcebergTablePropertyValues() - .forEach( - tuple -> { - String tableName = - String.format( - "test_iceberg_%s_%s_mergeinto_delete_operation", tuple._1(), tuple._2()); - dropTableIfExists(tableName); - createIcebergTableWithTabProperties( - tableName, - tuple._1(), - ImmutableMap.of( - ICEBERG_FORMAT_VERSION, - String.valueOf(tuple._2()), - ICEBERG_MERGE_MODE, - tuple._3())); - checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); - checkTableDeleteByMergeInto(tableName); - }); + private void testIcebergMergeIntoDeleteOperation( + IcebergTableWriteProperties icebergTableWriteProperties) { + String tableName = + String.format( + "test_iceberg_%s_%s_mergeinto_delete_operation", + icebergTableWriteProperties.isPartitionedTable, + icebergTableWriteProperties.formatVersion); + dropTableIfExists(tableName); + createIcebergTableWithTabProperties( + tableName, + icebergTableWriteProperties.isPartitionedTable, + ImmutableMap.of( + ICEBERG_FORMAT_VERSION, + String.valueOf(icebergTableWriteProperties.formatVersion), + ICEBERG_MERGE_MODE, + icebergTableWriteProperties.writeMode)); + checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); + checkDeleteByMergeInto(tableName); } - private void testIcebergMergeIntoUpdateOperation() { - getIcebergTablePropertyValues() - .forEach( - tuple -> { - String tableName = - String.format( - "test_iceberg_%s_%s_mergeinto_update_operation", tuple._1(), tuple._2()); - dropTableIfExists(tableName); - createIcebergTableWithTabProperties( - tableName, - tuple._1(), - ImmutableMap.of( - ICEBERG_FORMAT_VERSION, - String.valueOf(tuple._2()), - ICEBERG_MERGE_MODE, - tuple._3())); - checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); - checkTableUpdateByMergeInto(tableName); - }); + private void testIcebergMergeIntoUpdateOperation( + IcebergTableWriteProperties icebergTableWriteProperties) { + String tableName = + String.format( + "test_iceberg_%s_%s_mergeinto_update_operation", + icebergTableWriteProperties.isPartitionedTable, + icebergTableWriteProperties.formatVersion); + dropTableIfExists(tableName); + createIcebergTableWithTabProperties( + tableName, + icebergTableWriteProperties.isPartitionedTable, + ImmutableMap.of( + ICEBERG_FORMAT_VERSION, + String.valueOf(icebergTableWriteProperties.formatVersion), + ICEBERG_MERGE_MODE, + icebergTableWriteProperties.writeMode)); + checkTableColumns(tableName, getSimpleTableColumn(), getTableInfo(tableName)); + checkTableUpdateByMergeInto(tableName); } private List getIcebergSimpleTableColumn() { @@ -534,12 +518,12 @@ private SparkMetadataColumnInfo[] getIcebergMetadataColumns() { }; } - private List> getIcebergTablePropertyValues() { + private List getIcebergTablePropertyValues() { return Arrays.asList( - new Tuple3<>(false, 1, "copy-on-write"), - new Tuple3<>(false, 2, "merge-on-read"), - new Tuple3<>(true, 1, "copy-on-write"), - new Tuple3<>(true, 2, "merge-on-read")); + IcebergTableWriteProperties.of(false, 1, "copy-on-write"), + IcebergTableWriteProperties.of(false, 2, "merge-on-read"), + IcebergTableWriteProperties.of(true, 1, "copy-on-write"), + IcebergTableWriteProperties.of(true, 2, "merge-on-read")); } private void createIcebergTableWithTabProperties( @@ -555,4 +539,24 @@ private void createIcebergTableWithTabProperties( tableName, partitionedClause, tblPropertiesStr); sql(createSql); } + + @Data + private static class IcebergTableWriteProperties { + + private boolean isPartitionedTable; + private int formatVersion; + private String writeMode; + + private IcebergTableWriteProperties( + boolean isPartitionedTable, int formatVersion, String writeMode) { + this.isPartitionedTable = isPartitionedTable; + this.formatVersion = formatVersion; + this.writeMode = writeMode; + } + + static IcebergTableWriteProperties of( + boolean isPartitionedTable, int formatVersion, String writeMode) { + return new IcebergTableWriteProperties(isPartitionedTable, formatVersion, writeMode); + } + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java index 43d3b85adfb..4c322cf05a4 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java @@ -102,6 +102,8 @@ static SparkTableInfo create(Table baseTable) { sparkTableInfo.tableName = items[1]; sparkTableInfo.database = items[0]; sparkTableInfo.columns = + // using `baseTable.schema()` directly will failed because the method named `schema` is + // Deprecated in Spark Table interface Arrays.stream(getSchema(baseTable).fields()) .map( sparkField -> diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java index bad6fa0cb62..cd55e1205ba 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkUtilIT.java @@ -157,8 +157,8 @@ protected void insertTableAsSelect(String tableName, String newName) { sql(String.format("INSERT INTO TABLE %s SELECT * FROM %s", newName, tableName)); } - protected static String getSelectAllSqlWithOrder(String tableName) { - return String.format("SELECT * FROM %s ORDER BY id", tableName); + protected static String getSelectAllSqlWithOrder(String tableName, String orderByColumn) { + return String.format("SELECT * FROM %s ORDER BY %s", tableName, orderByColumn); } private static String getSelectAllSql(String tableName) { diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java index 1cfc98de6ef..b4ba05222ea 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Optional; import javax.ws.rs.NotSupportedException; -import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -164,10 +163,10 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti } } - @SneakyThrows @Override public Table createTable( - Identifier ident, Column[] columns, Transform[] transforms, Map properties) { + Identifier ident, Column[] columns, Transform[] transforms, Map properties) + throws TableAlreadyExistsException, NoSuchNamespaceException { NameIdentifier gravitinoIdentifier = NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()); com.datastrato.gravitino.rel.Column[] gravitinoColumns = @@ -197,7 +196,16 @@ public Table createTable( partitionings, distributionAndSortOrdersInfo.getDistribution(), distributionAndSortOrdersInfo.getSortOrders()); - Table sparkTable = sparkCatalog.loadTable(ident); + Table sparkTable; + try { + sparkTable = sparkCatalog.loadTable(ident); + } catch (NoSuchTableException e) { + throw new RuntimeException( + String.format( + "Failed to load the real sparkTable: %s", + String.join(".", getDatabase(ident), ident.name())), + e); + } return createSparkTable( ident, gravitinoTable, @@ -209,8 +217,6 @@ public Table createTable( throw new NoSuchNamespaceException(ident.namespace()); } catch (com.datastrato.gravitino.exceptions.TableAlreadyExistsException e) { throw new TableAlreadyExistsException(ident); - } catch (NoSuchTableException e) { - throw new NoSuchTableException(ident); } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java index 48c88f1262d..77d18bdc67a 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java @@ -8,7 +8,7 @@ import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; -import com.datastrato.gravitino.spark.connector.utils.SparkBaseTableHelper; +import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper; import com.google.common.annotations.VisibleForTesting; import java.util.Map; import java.util.Set; @@ -16,6 +16,8 @@ import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.Transform; @@ -28,66 +30,64 @@ /** Keep consistent behavior with the SparkIcebergTable */ public class SparkHiveTable extends HiveTable { - private SparkBaseTableHelper sparkBaseTableHelper; + private GravitinoTableInfoHelper gravitinoTableInfoHelper; + private org.apache.spark.sql.connector.catalog.Table sparkTable; public SparkHiveTable( Identifier identifier, Table gravitinoTable, - org.apache.spark.sql.connector.catalog.Table sparkHiveTable, + org.apache.spark.sql.connector.catalog.Table sparkTable, TableCatalog sparkHiveCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { super( SparkSession.active(), - ((HiveTable) sparkHiveTable).catalogTable(), + ((HiveTable) sparkTable).catalogTable(), (HiveTableCatalog) sparkHiveCatalog); - this.sparkBaseTableHelper = - new SparkBaseTableHelper( - identifier, - gravitinoTable, - sparkHiveTable, - propertiesConverter, - sparkTransformConverter); + this.gravitinoTableInfoHelper = + new GravitinoTableInfoHelper( + identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); + this.sparkTable = sparkTable; } @Override public String name() { - return sparkBaseTableHelper.name(false); + return gravitinoTableInfoHelper.name(false); } @Override @SuppressWarnings("deprecation") public StructType schema() { - return sparkBaseTableHelper.schema(); + return gravitinoTableInfoHelper.schema(); } @Override public Map properties() { - return sparkBaseTableHelper.properties(); + return gravitinoTableInfoHelper.properties(); } @Override public Transform[] partitioning() { - return sparkBaseTableHelper.partitioning(); + return gravitinoTableInfoHelper.partitioning(); } @Override public Set capabilities() { - return sparkBaseTableHelper.capabilities(); + return sparkTable.capabilities(); } @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - return sparkBaseTableHelper.newScanBuilder(options); + return ((SupportsRead) sparkTable).newScanBuilder(options); } @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { - return sparkBaseTableHelper.newWriteBuilder(info); + return ((SupportsWrite) sparkTable).newWriteBuilder(info); } @VisibleForTesting public SparkTransformConverter getSparkTransformConverter() { - return sparkBaseTableHelper.getSparkTransformConverter(); + return gravitinoTableInfoHelper.getSparkTransformConverter(); } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java index 14afb7ef136..bd51e62e5b0 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java @@ -8,7 +8,7 @@ import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; -import com.datastrato.gravitino.spark.connector.utils.SparkBaseTableHelper; +import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper; import com.google.common.annotations.VisibleForTesting; import java.lang.reflect.Field; import java.util.Map; @@ -19,7 +19,9 @@ import org.apache.spark.sql.connector.catalog.MetadataColumn; import org.apache.spark.sql.connector.catalog.SupportsDelete; import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; +import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations; +import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.Transform; @@ -38,85 +40,82 @@ */ public class SparkIcebergTable extends SparkTable { - private SparkBaseTableHelper sparkBaseTableHelper; + private GravitinoTableInfoHelper gravitinoTableInfoHelper; + private org.apache.spark.sql.connector.catalog.Table sparkTable; public SparkIcebergTable( Identifier identifier, Table gravitinoTable, - org.apache.spark.sql.connector.catalog.Table sparkIcebergTable, + org.apache.spark.sql.connector.catalog.Table sparkTable, TableCatalog sparkIcebergCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { - super(((SparkTable) sparkIcebergTable).table(), !isCacheEnabled(sparkIcebergCatalog)); - this.sparkBaseTableHelper = - new SparkBaseTableHelper( - identifier, - gravitinoTable, - sparkIcebergTable, - propertiesConverter, - sparkTransformConverter); + super(((SparkTable) sparkTable).table(), !isCacheEnabled(sparkIcebergCatalog)); + this.gravitinoTableInfoHelper = + new GravitinoTableInfoHelper( + identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); + this.sparkTable = sparkTable; } @Override public String name() { - return sparkBaseTableHelper.name(true); + return gravitinoTableInfoHelper.name(true); } @Override @SuppressWarnings("deprecation") public StructType schema() { - return sparkBaseTableHelper.schema(); + return gravitinoTableInfoHelper.schema(); } @Override public Map properties() { - return sparkBaseTableHelper.properties(); + return gravitinoTableInfoHelper.properties(); } @Override public Transform[] partitioning() { - return sparkBaseTableHelper.partitioning(); + return gravitinoTableInfoHelper.partitioning(); } @Override public Set capabilities() { - return sparkBaseTableHelper.capabilities(); + return sparkTable.capabilities(); } @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - return sparkBaseTableHelper.newScanBuilder(options); + return ((SupportsRead) sparkTable).newScanBuilder(options); } @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { - return sparkBaseTableHelper.newWriteBuilder(info); + return ((SupportsWrite) sparkTable).newWriteBuilder(info); } @Override public boolean canDeleteWhere(Filter[] filters) { - return ((SupportsDelete) sparkBaseTableHelper.getSparkTable()).canDeleteWhere(filters); + return ((SupportsDelete) sparkTable).canDeleteWhere(filters); } @Override public void deleteWhere(Filter[] filters) { - ((SupportsDelete) sparkBaseTableHelper.getSparkTable()).deleteWhere(filters); + ((SupportsDelete) sparkTable).deleteWhere(filters); } @Override public MetadataColumn[] metadataColumns() { - return ((SupportsMetadataColumns) sparkBaseTableHelper.getSparkTable()).metadataColumns(); + return ((SupportsMetadataColumns) sparkTable).metadataColumns(); } @Override public RowLevelOperationBuilder newRowLevelOperationBuilder(RowLevelOperationInfo info) { - return ((SupportsRowLevelOperations) sparkBaseTableHelper.getSparkTable()) - .newRowLevelOperationBuilder(info); + return ((SupportsRowLevelOperations) sparkTable).newRowLevelOperationBuilder(info); } @VisibleForTesting public SparkTransformConverter getSparkTransformConverter() { - return sparkBaseTableHelper.getSparkTransformConverter(); + return gravitinoTableInfoHelper.getSparkTransformConverter(); } private static boolean isCacheEnabled(TableCatalog sparkIcebergCatalog) { diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java index 201666cc004..3a80d7a6148 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java @@ -5,7 +5,8 @@ package com.datastrato.gravitino.spark.connector.plugin; -import static com.datastrato.gravitino.spark.connector.utils.ConnectorUtil.removeDuplicates; +import static com.datastrato.gravitino.spark.connector.ConnectorConstants.COMMA; +import static com.datastrato.gravitino.spark.connector.utils.ConnectorUtil.removeDuplicateSparkExtensions; import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig; @@ -110,12 +111,17 @@ private void registerCatalog(SparkConf sparkConf, String catalogName, String pro } private void registerSqlExtensions(SparkConf conf) { - String gravitinoDriverExtensions = String.join(",", GRAVITINO_DRIVER_EXTENSIONS); + String gravitinoDriverExtensions = String.join(COMMA, GRAVITINO_DRIVER_EXTENSIONS); if (conf.contains(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key())) { String sparkSessionExtensions = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key()); - conf.set( - StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), - removeDuplicates(GRAVITINO_DRIVER_EXTENSIONS, sparkSessionExtensions)); + if (StringUtils.isNotBlank(sparkSessionExtensions)) { + conf.set( + StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), + removeDuplicateSparkExtensions( + GRAVITINO_DRIVER_EXTENSIONS, sparkSessionExtensions.split(COMMA))); + } else { + conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), gravitinoDriverExtensions); + } } else { conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), gravitinoDriverExtensions); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java index eeaa56c9da2..e09c125b384 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java @@ -10,14 +10,14 @@ import java.util.Arrays; import java.util.LinkedHashSet; import java.util.Set; -import org.apache.commons.lang3.StringUtils; public class ConnectorUtil { - public static String removeDuplicates(String[] elements, String otherElements) { - Set uniqueElements = new LinkedHashSet<>(Arrays.asList(elements)); - if (StringUtils.isNotBlank(otherElements)) { - uniqueElements.addAll(Arrays.asList(otherElements.split(COMMA))); + public static String removeDuplicateSparkExtensions( + String[] extensions, String[] addedExtensions) { + Set uniqueElements = new LinkedHashSet<>(Arrays.asList(extensions)); + if (addedExtensions != null && addedExtensions.length > 0) { + uniqueElements.addAll(Arrays.asList(addedExtensions)); } return uniqueElements.stream() .reduce((element1, element2) -> element1 + COMMA + element2) diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/SparkBaseTableHelper.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java similarity index 76% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/SparkBaseTableHelper.java rename to spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java index 461ce49384a..8624aac2572 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/SparkBaseTableHelper.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java @@ -16,46 +16,33 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.SupportsRead; -import org.apache.spark.sql.connector.catalog.SupportsWrite; -import org.apache.spark.sql.connector.catalog.Table; -import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.expressions.Transform; -import org.apache.spark.sql.connector.read.ScanBuilder; -import org.apache.spark.sql.connector.write.LogicalWriteInfo; -import org.apache.spark.sql.connector.write.WriteBuilder; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** - * Provides schema info from Gravitino, IO from the internal spark table. The specific catalog table - * could implement more capabilities like SupportsPartitionManagement for Hive table, SupportsIndex - * for JDBC table, SupportsRowLevelOperations for Iceberg table. + * GravitinoTableInfoHelper is a common helper class that is used to retrieve table info from the + * Gravitino Server */ -public class SparkBaseTableHelper { +public class GravitinoTableInfoHelper { private Identifier identifier; private com.datastrato.gravitino.rel.Table gravitinoTable; - private Table sparkTable; private PropertiesConverter propertiesConverter; private SparkTransformConverter sparkTransformConverter; - public SparkBaseTableHelper( + public GravitinoTableInfoHelper( Identifier identifier, com.datastrato.gravitino.rel.Table gravitinoTable, - Table sparkTable, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { this.identifier = identifier; this.gravitinoTable = gravitinoTable; - this.sparkTable = sparkTable; this.propertiesConverter = propertiesConverter; this.sparkTransformConverter = sparkTransformConverter; } @@ -111,22 +98,6 @@ public Transform[] partitioning() { return sparkTransformConverter.toSparkTransform(partitions, distribution, sortOrders); } - public Set capabilities() { - return sparkTable.capabilities(); - } - - public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - return ((SupportsRead) sparkTable).newScanBuilder(options); - } - - public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { - return ((SupportsWrite) sparkTable).newWriteBuilder(info); - } - - public Table getSparkTable() { - return sparkTable; - } - public SparkTransformConverter getSparkTransformConverter() { return sparkTransformConverter; } diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java index 81d452d28e8..96366586e8d 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java @@ -5,6 +5,8 @@ package com.datastrato.gravitino.spark.connector.utils; +import static com.datastrato.gravitino.spark.connector.ConnectorConstants.COMMA; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -14,18 +16,24 @@ public class TestConnectorUtil { @Test void testRemoveDuplicates() { - String[] elements = {"a", "b", "c"}; - String otherElements = "a,d,e"; - String result = ConnectorUtil.removeDuplicates(elements, otherElements); + String[] extensions = {"a", "b", "c"}; + String addedExtensions = "a,d,e"; + String result = + ConnectorUtil.removeDuplicateSparkExtensions(extensions, addedExtensions.split(COMMA)); Assertions.assertEquals(result, "a,b,c,d,e"); - elements = new String[] {"a", "a", "b", "c"}; - otherElements = ""; - result = ConnectorUtil.removeDuplicates(elements, otherElements); + extensions = new String[] {"a", "a", "b", "c"}; + addedExtensions = ""; + result = ConnectorUtil.removeDuplicateSparkExtensions(extensions, addedExtensions.split(COMMA)); + Assertions.assertEquals(result, "a,b,c"); + + extensions = new String[] {"a", "a", "b", "c"}; + addedExtensions = "b"; + result = ConnectorUtil.removeDuplicateSparkExtensions(extensions, addedExtensions.split(COMMA)); Assertions.assertEquals(result, "a,b,c"); - elements = new String[] {"a", "a", "b", "c"}; - result = ConnectorUtil.removeDuplicates(elements, null); + extensions = new String[] {"a", "a", "b", "c"}; + result = ConnectorUtil.removeDuplicateSparkExtensions(extensions, null); Assertions.assertEquals(result, "a,b,c"); } } From b8e9952b58768c04bf39e543c4f2fac5c7a449df Mon Sep 17 00:00:00 2001 From: caican00 Date: Tue, 7 May 2024 17:26:08 +0800 Subject: [PATCH 3/6] update --- .../gravitino/spark/connector/utils/ConnectorUtil.java | 3 ++- .../gravitino/spark/connector/utils/TestConnectorUtil.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java index e09c125b384..673d6cf0380 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java @@ -10,13 +10,14 @@ import java.util.Arrays; import java.util.LinkedHashSet; import java.util.Set; +import org.apache.commons.lang3.StringUtils; public class ConnectorUtil { public static String removeDuplicateSparkExtensions( String[] extensions, String[] addedExtensions) { Set uniqueElements = new LinkedHashSet<>(Arrays.asList(extensions)); - if (addedExtensions != null && addedExtensions.length > 0) { + if (addedExtensions != null && StringUtils.isNoneBlank(addedExtensions)) { uniqueElements.addAll(Arrays.asList(addedExtensions)); } return uniqueElements.stream() diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java index 96366586e8d..4f1d73dd024 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java @@ -15,7 +15,7 @@ public class TestConnectorUtil { @Test - void testRemoveDuplicates() { + void testRemoveDuplicateSparkExtensions() { String[] extensions = {"a", "b", "c"}; String addedExtensions = "a,d,e"; String result = From 2f3594f3592f81962f14c4bcb228403fc9f0c73a Mon Sep 17 00:00:00 2001 From: caican00 Date: Sat, 11 May 2024 14:56:43 +0800 Subject: [PATCH 4/6] update --- .../spark/iceberg/SparkIcebergCatalogIT.java | 10 ++-- .../spark/connector/catalog/BaseCatalog.java | 37 ++----------- .../connector/hive/GravitinoHiveCatalog.java | 12 ++++- .../spark/connector/hive/SparkHiveTable.java | 25 --------- .../iceberg/GravitinoIcebergCatalog.java | 12 ++++- .../connector/iceberg/SparkIcebergTable.java | 52 ------------------- 6 files changed, 31 insertions(+), 117 deletions(-) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java index a757db1be5d..a87246bfce1 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogIT.java @@ -416,7 +416,7 @@ private void testIcebergDeleteOperation(IcebergTableWriteProperties icebergTable icebergTableWriteProperties.isPartitionedTable, icebergTableWriteProperties.formatVersion); dropTableIfExists(tableName); - createIcebergTableWithTabProperties( + createIcebergTableWithTableProperties( tableName, icebergTableWriteProperties.isPartitionedTable, ImmutableMap.of( @@ -435,7 +435,7 @@ private void testIcebergUpdateOperation(IcebergTableWriteProperties icebergTable icebergTableWriteProperties.isPartitionedTable, icebergTableWriteProperties.formatVersion); dropTableIfExists(tableName); - createIcebergTableWithTabProperties( + createIcebergTableWithTableProperties( tableName, icebergTableWriteProperties.isPartitionedTable, ImmutableMap.of( @@ -455,7 +455,7 @@ private void testIcebergMergeIntoDeleteOperation( icebergTableWriteProperties.isPartitionedTable, icebergTableWriteProperties.formatVersion); dropTableIfExists(tableName); - createIcebergTableWithTabProperties( + createIcebergTableWithTableProperties( tableName, icebergTableWriteProperties.isPartitionedTable, ImmutableMap.of( @@ -475,7 +475,7 @@ private void testIcebergMergeIntoUpdateOperation( icebergTableWriteProperties.isPartitionedTable, icebergTableWriteProperties.formatVersion); dropTableIfExists(tableName); - createIcebergTableWithTabProperties( + createIcebergTableWithTableProperties( tableName, icebergTableWriteProperties.isPartitionedTable, ImmutableMap.of( @@ -526,7 +526,7 @@ private List getIcebergTablePropertyValues() { IcebergTableWriteProperties.of(true, 2, "merge-on-read")); } - private void createIcebergTableWithTabProperties( + private void createIcebergTableWithTableProperties( String tableName, boolean isPartitioned, ImmutableMap tblProperties) { String partitionedClause = isPartitioned ? " PARTITIONED BY (name) " : ""; String tblPropertiesStr = diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java index b4ba05222ea..7fc4b20fa25 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java @@ -92,7 +92,6 @@ protected abstract TableCatalog createAndInitSparkCatalog( * * @param identifier Spark's table identifier * @param gravitinoTable Gravitino table to do DDL operations - * @param sparkTable specific Spark table to do IO operations * @param sparkCatalog specific Spark catalog to do IO operations * @param propertiesConverter transform properties between Gravitino and Spark * @param sparkTransformConverter sparkTransformConverter convert transforms between Gravitino and @@ -102,7 +101,6 @@ protected abstract TableCatalog createAndInitSparkCatalog( protected abstract Table createSparkTable( Identifier identifier, com.datastrato.gravitino.rel.Table gravitinoTable, - Table sparkTable, TableCatalog sparkCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter); @@ -196,23 +194,8 @@ public Table createTable( partitionings, distributionAndSortOrdersInfo.getDistribution(), distributionAndSortOrdersInfo.getSortOrders()); - Table sparkTable; - try { - sparkTable = sparkCatalog.loadTable(ident); - } catch (NoSuchTableException e) { - throw new RuntimeException( - String.format( - "Failed to load the real sparkTable: %s", - String.join(".", getDatabase(ident), ident.name())), - e); - } return createSparkTable( - ident, - gravitinoTable, - sparkTable, - sparkCatalog, - propertiesConverter, - sparkTransformConverter); + ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); } catch (NoSuchSchemaException e) { throw new NoSuchNamespaceException(ident.namespace()); } catch (com.datastrato.gravitino.exceptions.TableAlreadyExistsException e) { @@ -228,15 +211,9 @@ public Table loadTable(Identifier ident) throws NoSuchTableException { gravitinoCatalogClient .asTableCatalog() .loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name())); - Table sparkTable = sparkCatalog.loadTable(ident); // Will create a catalog specific table return createSparkTable( - ident, - gravitinoTable, - sparkTable, - sparkCatalog, - propertiesConverter, - sparkTransformConverter); + ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -263,14 +240,8 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT .alterTable( NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()), gravitinoTableChanges); - Table sparkTable = sparkCatalog.loadTable(ident); return createSparkTable( - ident, - gravitinoTable, - sparkTable, - sparkCatalog, - propertiesConverter, - sparkTransformConverter); + ident, gravitinoTable, sparkCatalog, propertiesConverter, sparkTransformConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -432,7 +403,7 @@ private com.datastrato.gravitino.rel.Column createGravitinoColumn(Column sparkCo com.datastrato.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET); } - private String getDatabase(Identifier sparkIdentifier) { + protected String getDatabase(Identifier sparkIdentifier) { if (sparkIdentifier.namespace().length > 0) { return sparkIdentifier.namespace()[0]; } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java index a1cefdaf3a9..3de133553aa 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java @@ -15,6 +15,7 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -44,10 +45,19 @@ protected TableCatalog createAndInitSparkCatalog( protected org.apache.spark.sql.connector.catalog.Table createSparkTable( Identifier identifier, Table gravitinoTable, - org.apache.spark.sql.connector.catalog.Table sparkTable, TableCatalog sparkHiveCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { + org.apache.spark.sql.connector.catalog.Table sparkTable; + try { + sparkTable = sparkHiveCatalog.loadTable(identifier); + } catch (NoSuchTableException e) { + throw new RuntimeException( + String.format( + "Failed to load the real sparkTable: %s", + String.join(".", getDatabase(identifier), identifier.name())), + e); + } return new SparkHiveTable( identifier, gravitinoTable, diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java index 77d18bdc67a..afa12e7685f 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java @@ -11,27 +11,18 @@ import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper; import com.google.common.annotations.VisibleForTesting; import java.util.Map; -import java.util.Set; import org.apache.kyuubi.spark.connector.hive.HiveTable; import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.SupportsRead; -import org.apache.spark.sql.connector.catalog.SupportsWrite; -import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.Transform; -import org.apache.spark.sql.connector.read.ScanBuilder; -import org.apache.spark.sql.connector.write.LogicalWriteInfo; -import org.apache.spark.sql.connector.write.WriteBuilder; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** Keep consistent behavior with the SparkIcebergTable */ public class SparkHiveTable extends HiveTable { private GravitinoTableInfoHelper gravitinoTableInfoHelper; - private org.apache.spark.sql.connector.catalog.Table sparkTable; public SparkHiveTable( Identifier identifier, @@ -47,7 +38,6 @@ public SparkHiveTable( this.gravitinoTableInfoHelper = new GravitinoTableInfoHelper( identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); - this.sparkTable = sparkTable; } @Override @@ -71,21 +61,6 @@ public Transform[] partitioning() { return gravitinoTableInfoHelper.partitioning(); } - @Override - public Set capabilities() { - return sparkTable.capabilities(); - } - - @Override - public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - return ((SupportsRead) sparkTable).newScanBuilder(options); - } - - @Override - public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { - return ((SupportsWrite) sparkTable).newWriteBuilder(info); - } - @VisibleForTesting public SparkTransformConverter getSparkTransformConverter() { return gravitinoTableInfoHelper.getSparkTransformConverter(); diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java index 5355dbc3dfd..c981f68516d 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java @@ -17,6 +17,7 @@ import org.apache.iceberg.spark.SparkCatalog; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.FunctionCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -68,10 +69,19 @@ protected TableCatalog createAndInitSparkCatalog( protected org.apache.spark.sql.connector.catalog.Table createSparkTable( Identifier identifier, Table gravitinoTable, - org.apache.spark.sql.connector.catalog.Table sparkTable, TableCatalog sparkIcebergCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { + org.apache.spark.sql.connector.catalog.Table sparkTable; + try { + sparkTable = sparkIcebergCatalog.loadTable(identifier); + } catch (NoSuchTableException e) { + throw new RuntimeException( + String.format( + "Failed to load the real sparkTable: %s", + String.join(".", getDatabase(identifier), identifier.name())), + e); + } return new SparkIcebergTable( identifier, gravitinoTable, diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java index bd51e62e5b0..58bd64d5163 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java @@ -12,27 +12,12 @@ import com.google.common.annotations.VisibleForTesting; import java.lang.reflect.Field; import java.util.Map; -import java.util.Set; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.MetadataColumn; -import org.apache.spark.sql.connector.catalog.SupportsDelete; -import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; -import org.apache.spark.sql.connector.catalog.SupportsRead; -import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations; -import org.apache.spark.sql.connector.catalog.SupportsWrite; -import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.Transform; -import org.apache.spark.sql.connector.read.ScanBuilder; -import org.apache.spark.sql.connector.write.LogicalWriteInfo; -import org.apache.spark.sql.connector.write.RowLevelOperationBuilder; -import org.apache.spark.sql.connector.write.RowLevelOperationInfo; -import org.apache.spark.sql.connector.write.WriteBuilder; -import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * For spark-connector in Iceberg, it explicitly uses SparkTable to identify whether it is an @@ -41,7 +26,6 @@ public class SparkIcebergTable extends SparkTable { private GravitinoTableInfoHelper gravitinoTableInfoHelper; - private org.apache.spark.sql.connector.catalog.Table sparkTable; public SparkIcebergTable( Identifier identifier, @@ -54,7 +38,6 @@ public SparkIcebergTable( this.gravitinoTableInfoHelper = new GravitinoTableInfoHelper( identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); - this.sparkTable = sparkTable; } @Override @@ -78,41 +61,6 @@ public Transform[] partitioning() { return gravitinoTableInfoHelper.partitioning(); } - @Override - public Set capabilities() { - return sparkTable.capabilities(); - } - - @Override - public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - return ((SupportsRead) sparkTable).newScanBuilder(options); - } - - @Override - public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { - return ((SupportsWrite) sparkTable).newWriteBuilder(info); - } - - @Override - public boolean canDeleteWhere(Filter[] filters) { - return ((SupportsDelete) sparkTable).canDeleteWhere(filters); - } - - @Override - public void deleteWhere(Filter[] filters) { - ((SupportsDelete) sparkTable).deleteWhere(filters); - } - - @Override - public MetadataColumn[] metadataColumns() { - return ((SupportsMetadataColumns) sparkTable).metadataColumns(); - } - - @Override - public RowLevelOperationBuilder newRowLevelOperationBuilder(RowLevelOperationInfo info) { - return ((SupportsRowLevelOperations) sparkTable).newRowLevelOperationBuilder(info); - } - @VisibleForTesting public SparkTransformConverter getSparkTransformConverter() { return gravitinoTableInfoHelper.getSparkTransformConverter(); From 27d05648fd431abeebf5582350cb0986d104c1e8 Mon Sep 17 00:00:00 2001 From: caican00 Date: Sat, 11 May 2024 17:18:35 +0800 Subject: [PATCH 5/6] update --- .../test/util/spark/SparkTableInfo.java | 20 +++------------- .../connector/SparkTransformConverter.java | 6 ----- .../connector/hive/GravitinoHiveCatalog.java | 5 ++-- .../spark/connector/hive/SparkHiveTable.java | 20 ++++------------ .../iceberg/GravitinoIcebergCatalog.java | 5 ++-- .../connector/iceberg/SparkIcebergTable.java | 24 +++++++------------ .../utils/GravitinoTableInfoHelper.java | 10 ++++---- 7 files changed, 28 insertions(+), 62 deletions(-) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java index 4c322cf05a4..0ef93040d4e 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/util/spark/SparkTableInfo.java @@ -6,7 +6,6 @@ package com.datastrato.gravitino.integration.test.util.spark; import com.datastrato.gravitino.spark.connector.ConnectorConstants; -import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.hive.SparkHiveTable; import com.datastrato.gravitino.spark.connector.iceberg.SparkIcebergTable; import java.util.ArrayList; @@ -115,14 +114,12 @@ static SparkTableInfo create(Table baseTable) { .collect(Collectors.toList()); sparkTableInfo.comment = baseTable.properties().remove(ConnectorConstants.COMMENT); sparkTableInfo.tableProperties = baseTable.properties(); - boolean supportsBucketPartition = - getSparkTransformConverter(baseTable).isSupportsBucketPartition(); Arrays.stream(baseTable.partitioning()) .forEach( transform -> { if (transform instanceof BucketTransform || transform instanceof SortedBucketTransform) { - if (isBucketPartition(supportsBucketPartition, transform)) { + if (isBucketPartition(baseTable, transform)) { sparkTableInfo.addPartition(transform); } else { sparkTableInfo.setBucket(transform); @@ -167,19 +164,8 @@ public List getPartitionedColumns() { .collect(Collectors.toList()); } - private static boolean isBucketPartition(boolean supportsBucketPartition, Transform transform) { - return supportsBucketPartition && !(transform instanceof SortedBucketTransform); - } - - private static SparkTransformConverter getSparkTransformConverter(Table baseTable) { - if (baseTable instanceof SparkHiveTable) { - return ((SparkHiveTable) baseTable).getSparkTransformConverter(); - } else if (baseTable instanceof SparkIcebergTable) { - return ((SparkIcebergTable) baseTable).getSparkTransformConverter(); - } else { - throw new IllegalArgumentException( - "Doesn't support Spark table: " + baseTable.getClass().getName()); - } + private static boolean isBucketPartition(Table baseTable, Transform transform) { + return baseTable instanceof SparkIcebergTable && !(transform instanceof SortedBucketTransform); } private static StructType getSchema(Table baseTable) { diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java index a636699024d..d830af0719d 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java @@ -13,7 +13,6 @@ import com.datastrato.gravitino.rel.expressions.sorts.SortOrders; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.expressions.transforms.Transforms; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Arrays; @@ -59,11 +58,6 @@ public SparkTransformConverter(boolean supportsBucketPartition) { this.supportsBucketPartition = supportsBucketPartition; } - @VisibleForTesting - public boolean isSupportsBucketPartition() { - return supportsBucketPartition; - } - @Getter public static class DistributionAndSortOrdersInfo { private Distribution distribution; diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java index 3de133553aa..28a17e3620b 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java @@ -14,6 +14,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; +import org.apache.kyuubi.spark.connector.hive.HiveTable; import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; @@ -61,8 +62,8 @@ protected org.apache.spark.sql.connector.catalog.Table createSparkTable( return new SparkHiveTable( identifier, gravitinoTable, - sparkTable, - sparkHiveCatalog, + (HiveTable) sparkTable, + (HiveTableCatalog) sparkHiveCatalog, propertiesConverter, sparkTransformConverter); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java index afa12e7685f..e27916af283 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java @@ -9,13 +9,11 @@ import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper; -import com.google.common.annotations.VisibleForTesting; import java.util.Map; import org.apache.kyuubi.spark.connector.hive.HiveTable; import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; @@ -27,22 +25,19 @@ public class SparkHiveTable extends HiveTable { public SparkHiveTable( Identifier identifier, Table gravitinoTable, - org.apache.spark.sql.connector.catalog.Table sparkTable, - TableCatalog sparkHiveCatalog, + HiveTable hiveTable, + HiveTableCatalog hiveTableCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { - super( - SparkSession.active(), - ((HiveTable) sparkTable).catalogTable(), - (HiveTableCatalog) sparkHiveCatalog); + super(SparkSession.active(), hiveTable.catalogTable(), hiveTableCatalog); this.gravitinoTableInfoHelper = new GravitinoTableInfoHelper( - identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); + false, identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); } @Override public String name() { - return gravitinoTableInfoHelper.name(false); + return gravitinoTableInfoHelper.name(); } @Override @@ -60,9 +55,4 @@ public Map properties() { public Transform[] partitioning() { return gravitinoTableInfoHelper.partitioning(); } - - @VisibleForTesting - public SparkTransformConverter getSparkTransformConverter() { - return gravitinoTableInfoHelper.getSparkTransformConverter(); - } } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java index c981f68516d..79eda81b525 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java @@ -15,6 +15,7 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -85,8 +86,8 @@ protected org.apache.spark.sql.connector.catalog.Table createSparkTable( return new SparkIcebergTable( identifier, gravitinoTable, - sparkTable, - sparkIcebergCatalog, + (SparkTable) sparkTable, + (SparkCatalog) sparkIcebergCatalog, propertiesConverter, sparkTransformConverter); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java index 58bd64d5163..870ff535f88 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java @@ -9,13 +9,11 @@ import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper; -import com.google.common.annotations.VisibleForTesting; import java.lang.reflect.Field; import java.util.Map; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; @@ -30,19 +28,19 @@ public class SparkIcebergTable extends SparkTable { public SparkIcebergTable( Identifier identifier, Table gravitinoTable, - org.apache.spark.sql.connector.catalog.Table sparkTable, - TableCatalog sparkIcebergCatalog, + SparkTable sparkTable, + SparkCatalog sparkCatalog, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { - super(((SparkTable) sparkTable).table(), !isCacheEnabled(sparkIcebergCatalog)); + super(sparkTable.table(), !isCacheEnabled(sparkCatalog)); this.gravitinoTableInfoHelper = new GravitinoTableInfoHelper( - identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); + true, identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); } @Override public String name() { - return gravitinoTableInfoHelper.name(true); + return gravitinoTableInfoHelper.name(); } @Override @@ -61,17 +59,11 @@ public Transform[] partitioning() { return gravitinoTableInfoHelper.partitioning(); } - @VisibleForTesting - public SparkTransformConverter getSparkTransformConverter() { - return gravitinoTableInfoHelper.getSparkTransformConverter(); - } - - private static boolean isCacheEnabled(TableCatalog sparkIcebergCatalog) { + private static boolean isCacheEnabled(SparkCatalog sparkCatalog) { try { - SparkCatalog catalog = ((SparkCatalog) sparkIcebergCatalog); - Field cacheEnabled = catalog.getClass().getDeclaredField("cacheEnabled"); + Field cacheEnabled = sparkCatalog.getClass().getDeclaredField("cacheEnabled"); cacheEnabled.setAccessible(true); - return cacheEnabled.getBoolean(catalog); + return cacheEnabled.getBoolean(sparkCatalog); } catch (NoSuchFieldException | IllegalAccessException e) { throw new RuntimeException("Failed to get cacheEnabled field from SparkCatalog", e); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java index 8624aac2572..a1ab61021c4 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java @@ -31,24 +31,27 @@ */ public class GravitinoTableInfoHelper { + private boolean isCaseSensitive; private Identifier identifier; private com.datastrato.gravitino.rel.Table gravitinoTable; private PropertiesConverter propertiesConverter; private SparkTransformConverter sparkTransformConverter; public GravitinoTableInfoHelper( + boolean isCaseSensitive, Identifier identifier, com.datastrato.gravitino.rel.Table gravitinoTable, PropertiesConverter propertiesConverter, SparkTransformConverter sparkTransformConverter) { + this.isCaseSensitive = isCaseSensitive; this.identifier = identifier; this.gravitinoTable = gravitinoTable; this.propertiesConverter = propertiesConverter; this.sparkTransformConverter = sparkTransformConverter; } - public String name(boolean isCaseSensitive) { - return getNormalizedIdentifier(identifier, gravitinoTable.name(), isCaseSensitive); + public String name() { + return getNormalizedIdentifier(identifier, gravitinoTable.name()); } public StructType schema() { @@ -104,8 +107,7 @@ public SparkTransformConverter getSparkTransformConverter() { // The underlying catalogs may not case-sensitive, to keep consistent with the action of SparkSQL, // we should return normalized identifiers. - private String getNormalizedIdentifier( - Identifier tableIdentifier, String gravitinoTableName, boolean isCaseSensitive) { + private String getNormalizedIdentifier(Identifier tableIdentifier, String gravitinoTableName) { if (tableIdentifier.namespace().length == 0) { return gravitinoTableName; } From 63c59044571eb2e23cc00ba80faecb7d46fe0fdf Mon Sep 17 00:00:00 2001 From: caican00 Date: Mon, 13 May 2024 10:16:29 +0800 Subject: [PATCH 6/6] update --- docs/spark-connector/spark-catalog-iceberg.md | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/docs/spark-connector/spark-catalog-iceberg.md b/docs/spark-connector/spark-catalog-iceberg.md index a50ed4fffe5..45038183edf 100644 --- a/docs/spark-connector/spark-catalog-iceberg.md +++ b/docs/spark-connector/spark-catalog-iceberg.md @@ -8,7 +8,7 @@ This software is licensed under the Apache License version 2." ## Capabilities -#### Support basic DML and DDL operations: +#### Support DML and DDL operations: - `CREATE TABLE` @@ -18,13 +18,12 @@ Supports basic create table clause including table schema, properties, partition - `ALTER TABLE` - `INSERT INTO&OVERWRITE` - `SELECT` -- `DELETE` - -Supports file delete only. +- `MERGE INOT` +- `DELETE FROM` +- `UPDATE` #### Not supported operations: -- Row level operations. like `MERGE INOT`, `DELETE FROM`, `UPDATE` - View operations. - Branching and tagging operations. - Spark procedures. @@ -57,4 +56,20 @@ VALUES (3, 'Charlie', 'Sales', TIMESTAMP '2021-03-01 08:45:00'); SELECT * FROM employee WHERE date(hire_date) = '2021-01-01' + +UPDATE employee SET department = 'Jenny' WHERE id = 1; + +DELETE FROM employee WHERE id < 2; + +MERGE INTO employee +USING (SELECT 4 as id, 'David' as name, 'Engineering' as department, TIMESTAMP '2021-04-01 09:00:00' as hire_date) as new_employee +ON employee.id = new_employee.id +WHEN MATCHED THEN UPDATE SET * +WHEN NOT MATCHED THEN INSERT *; + +MERGE INTO employee +USING (SELECT 4 as id, 'David' as name, 'Engineering' as department, TIMESTAMP '2021-04-01 09:00:00' as hire_date) as new_employee +ON employee.id = new_employee.id +WHEN MATCHED THEN DELETE +WHEN NOT MATCHED THEN INSERT *; ```