Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2541] feat(spark-connector): support basic DDL and DML operations to iceberg catalog #2544

Merged
merged 92 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 84 commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
b18810d
[#2541] feat(spark-connector): support DDL, read and write operations…
caican00 Mar 15, 2024
c4445bd
[#2566] feat(spark-connector): Refactoring integration tests for spar…
caican00 Mar 18, 2024
129eb65
Merge branch 'main' of github.com:datastrato/gravitino into seperate-…
caican00 Mar 18, 2024
c9ab007
[#2566] feat(spark-connector): Refactoring integration tests for spar…
caican00 Mar 19, 2024
a51aae8
Merge branch 'main' of github.com:datastrato/gravitino into seperate-…
caican00 Mar 19, 2024
c226536
Merge branch 'main' into seperate-spark-it
caican00 Mar 19, 2024
b80c366
[#2566] feat(spark-connector): Refactoring integration tests for spar…
caican00 Mar 19, 2024
a905bac
Merge branch 'seperate-spark-it' of github.com:caican00/gravitino int…
caican00 Mar 19, 2024
a7fbb0b
[#2566] feat(spark-connector): Refactoring integration tests for spar…
caican00 Mar 19, 2024
2847dc4
[#2566] feat(spark-connector): Refactoring integration tests for spar…
caican00 Mar 19, 2024
b2f31e8
[#2566] feat(spark-connector): Refactoring integration tests for spar…
caican00 Mar 19, 2024
82a7979
Merge branch 'main' into seperate-spark-it
caican00 Mar 19, 2024
48a9cfc
Merge remote-tracking branch 'upstream_dev/seperate-spark-it' into ic…
caican00 Mar 19, 2024
51b24b6
updated.
caican00 Mar 19, 2024
03dc347
Merge remote-tracking branch 'upstream_dev/seperate-spark-it' into ic…
caican00 Mar 19, 2024
e322776
updated.
caican00 Mar 19, 2024
bd6821c
updated.
caican00 Mar 19, 2024
2633a60
updated.
caican00 Mar 19, 2024
bb9e918
Merge remote-tracking branch 'upstream_dev/seperate-spark-it' into ic…
caican00 Mar 19, 2024
71a660b
Merge branch 'main' into iceberg-read-write
caican00 Mar 20, 2024
81b0590
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Mar 20, 2024
7fbf33d
Merge branch 'iceberg-read-write' of github.com:caican00/gravitino in…
caican00 Mar 20, 2024
563b39b
updated.
caican00 Mar 20, 2024
54ec3b9
updated.
caican00 Mar 20, 2024
8969ad6
updated.
caican00 Mar 20, 2024
2c039f9
updated.
caican00 Mar 20, 2024
00dfcdf
updated.
caican00 Mar 20, 2024
e5538c5
updated.
caican00 Mar 20, 2024
7f82286
updated.
caican00 Mar 20, 2024
9d1df67
updated.
caican00 Mar 20, 2024
2a7e122
updated.
caican00 Mar 20, 2024
bc884c3
updated.
caican00 Mar 20, 2024
69696c0
updated.
caican00 Mar 20, 2024
68f6717
updated.
caican00 Mar 21, 2024
a1a856b
Merge branch 'main' into iceberg-read-write
caican00 Mar 21, 2024
957b808
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Mar 21, 2024
ec2a22c
Merge branch 'iceberg-read-write' of github.com:caican00/gravitino in…
caican00 Mar 21, 2024
01ae436
fix
caican00 Mar 21, 2024
069f8a3
fix
caican00 Mar 21, 2024
286817c
Merge branch 'main' into iceberg-read-write
caican00 Mar 21, 2024
fc95191
fix
caican00 Mar 21, 2024
252b7b3
Merge branch 'iceberg-read-write' of github.com:caican00/gravitino in…
caican00 Mar 21, 2024
c3fae37
fix
caican00 Mar 22, 2024
6eb07de
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Mar 22, 2024
39f11f3
fix
caican00 Mar 22, 2024
49181d2
fix
caican00 Mar 22, 2024
16ec022
fix
caican00 Mar 22, 2024
2ad0a18
fix
caican00 Mar 23, 2024
9186a2f
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Mar 24, 2024
99dd725
fix
caican00 Mar 24, 2024
e75e231
fix
caican00 Mar 24, 2024
93a7643
test IT
caican00 Mar 25, 2024
a91249f
fix a IT
caican00 Mar 25, 2024
63a8f40
fix a IT
caican00 Mar 25, 2024
b893ffe
Merge branch 'main' into iceberg-read-write
caican00 Mar 25, 2024
d84e6a3
fix an IT
caican00 Mar 25, 2024
343086c
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Mar 25, 2024
48c41fc
Merge branch 'iceberg-read-write' of github.com:caican00/gravitino in…
caican00 Mar 25, 2024
ec35db0
update
caican00 Mar 25, 2024
af39f83
update
caican00 Mar 25, 2024
4d8598a
update
caican00 Mar 25, 2024
110964c
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Mar 26, 2024
4d3b50f
fix comment
caican00 Mar 26, 2024
2bc141b
Merge branch 'main' into iceberg-read-write
caican00 Mar 26, 2024
4004edc
fix IT
caican00 Mar 26, 2024
5d23c4e
Merge branch 'iceberg-read-write' of github.com:caican00/gravitino in…
caican00 Mar 26, 2024
dacb13c
Merge branch 'main' into iceberg-read-write
caican00 Mar 26, 2024
e7149b0
fix
caican00 Mar 27, 2024
ca56dcf
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Mar 27, 2024
f2d0fae
Merge branch 'iceberg-read-write' of github.com:caican00/gravitino in…
caican00 Mar 27, 2024
e434133
fix
caican00 Mar 27, 2024
df700cc
fix
caican00 Mar 27, 2024
74c7d2e
fix
caican00 Mar 27, 2024
03a5b4b
Merge branch 'main' into iceberg-read-write
caican00 Mar 27, 2024
eeb6318
fix
caican00 Mar 27, 2024
2d09006
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-r…
caican00 Mar 27, 2024
e4ad4d4
Merge branch 'iceberg-read-write' of github.com:caican00/gravitino in…
caican00 Mar 27, 2024
2a994c7
fix
caican00 Mar 27, 2024
b8e61dc
Merge branch 'main' into iceberg-read-write
caican00 Mar 27, 2024
ad98ce3
Merge branch 'main' into iceberg-read-write
caican00 Mar 28, 2024
99b0909
Merge branch 'main' into iceberg-read-write
caican00 Mar 28, 2024
a337e14
Merge remote-tracking branch 'upstream/main' into iceberg-read-write
caican00 Mar 31, 2024
a2e8efd
update
caican00 Mar 31, 2024
69a7af6
update
caican00 Mar 31, 2024
3d494ac
update
caican00 Apr 1, 2024
2301e35
update
caican00 Apr 1, 2024
8f632cb
Merge branch 'main' into iceberg-read-write
caican00 Apr 1, 2024
71577ed
update
caican00 Apr 2, 2024
d64e251
Merge branch 'main' into iceberg-read-write
caican00 Apr 2, 2024
8f13efc
update
caican00 Apr 2, 2024
509b0d6
update
caican00 Apr 2, 2024
d136e7f
Merge branch 'main' into iceberg-read-write
caican00 Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
*/
@Override
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels());
caican00 marked this conversation as resolved.
Show resolved Hide resolved
if (!schemaExists(schemaIdent)) {
caican00 marked this conversation as resolved.
Show resolved Hide resolved
throw new NoSuchSchemaException("Schema (database) does not exist %s", namespace);
}

try {
ListTablesResponse listTablesResponse =
icebergTableOps.listTable(IcebergTableOpsHelper.getIcebergNamespace(namespace));
Expand Down
1 change: 0 additions & 1 deletion integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ dependencies {
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(project(":spark-connector")) {
exclude("org.apache.iceberg")
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
exclude("org.apache.hadoop", "hadoop-client-api")
exclude("org.apache.hadoop", "hadoop-client-runtime")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.platform.commons.util.StringUtils;

public abstract class SparkCommonIT extends SparkEnvIT {

Expand Down Expand Up @@ -61,6 +61,8 @@ private static String getInsertWithPartitionSql(
// Whether supports [CLUSTERED BY col_name3 SORTED BY col_name INTO num_buckets BUCKETS]
protected abstract boolean supportsSparkSQLClusteredBy();

protected abstract boolean supportsPartition();

// Use a custom database not the original default database because SparkIT couldn't read&write
// data to tables in default database. The main reason is default database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address
Expand All @@ -79,6 +81,13 @@ void init() {
sql("USE " + getDefaultDatabase());
}

@AfterAll
void cleanUp() {
sql("USE " + getCatalogName());
getDatabases()
.forEach(database -> sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", database)));
}

@Test
void testLoadCatalogs() {
Set<String> catalogs = getCatalogs();
Expand All @@ -89,20 +98,20 @@ void testLoadCatalogs() {
void testCreateAndLoadSchema() {
caican00 marked this conversation as resolved.
Show resolved Hide resolved
String testDatabaseName = "t_create1";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);");
caican00 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
Assertions.assertFalse(databaseMeta.containsKey("Comment"));
Assertions.assertTrue(databaseMeta.containsKey("Location"));
Assertions.assertEquals("datastrato", databaseMeta.get("Owner"));
String properties = databaseMeta.get("Properties");
Assertions.assertTrue(StringUtils.isBlank(properties));
Assertions.assertTrue(properties.contains("(ID,001)"));

testDatabaseName = "t_create2";
dropDatabaseIfExists(testDatabaseName);
String testDatabaseLocation = "/tmp/" + testDatabaseName;
sql(
String.format(
"CREATE DATABASE %s COMMENT 'comment' LOCATION '%s'\n" + " WITH DBPROPERTIES (ID=001);",
"CREATE DATABASE %s COMMENT 'comment' LOCATION '%s'\n" + " WITH DBPROPERTIES (ID=002);",
testDatabaseName, testDatabaseLocation));
databaseMeta = getDatabaseMetadata(testDatabaseName);
String comment = databaseMeta.get("Comment");
Expand All @@ -111,19 +120,22 @@ void testCreateAndLoadSchema() {
// underlying catalog may change /tmp/t_create2 to file:/tmp/t_create2
Assertions.assertTrue(databaseMeta.get("Location").contains(testDatabaseLocation));
properties = databaseMeta.get("Properties");
Assertions.assertEquals("((ID,001))", properties);
Assertions.assertTrue(properties.contains("(ID,002)"));
}

@Test
void testAlterSchema() {
String testDatabaseName = "t_alter";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName + " WITH DBPROPERTIES (ID=001);");
Assertions.assertTrue(
StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties")));
getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,001)"));

sql(String.format("ALTER DATABASE %s SET DBPROPERTIES ('ID'='001')", testDatabaseName));
Assertions.assertEquals("((ID,001))", getDatabaseMetadata(testDatabaseName).get("Properties"));
sql(String.format("ALTER DATABASE %s SET DBPROPERTIES ('ID'='002')", testDatabaseName));
Assertions.assertFalse(
getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,001)"));
Assertions.assertTrue(
getDatabaseMetadata(testDatabaseName).get("Properties").contains("(ID,002)"));

// Hive metastore doesn't support alter database location, therefore this test method
// doesn't verify ALTER DATABASE database_name SET LOCATION 'new_location'.
Expand Down Expand Up @@ -334,9 +346,9 @@ void testAlterTableUpdateColumnType() {
checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName));

sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 int)", tableName));
sql(String.format("ALTER TABLE %S CHANGE COLUMN col1 col1 string", tableName));
sql(String.format("ALTER TABLE %S CHANGE COLUMN col1 col1 bigint", tableName));
ArrayList<SparkColumnInfo> updateColumns = new ArrayList<>(simpleTableColumns);
updateColumns.add(SparkColumnInfo.of("col1", DataTypes.StringType, null));
updateColumns.add(SparkColumnInfo.of("col1", DataTypes.LongType, null));
checkTableColumns(tableName, updateColumns, getTableInfo(tableName));
}

Expand All @@ -354,7 +366,7 @@ void testAlterTableRenameColumn() {
sql(String.format("ALTER TABLE %S ADD COLUMNS (col1 int)", tableName));
sql(
String.format(
"ALTER TABLE %S RENAME COLUMN %S TO %S", tableName, oldColumnName, newColumnName));
"ALTER TABLE %s RENAME COLUMN %s TO %s", tableName, oldColumnName, newColumnName));
ArrayList<SparkColumnInfo> renameColumns = new ArrayList<>(simpleTableColumns);
renameColumns.add(SparkColumnInfo.of(newColumnName, DataTypes.IntegerType, null));
checkTableColumns(tableName, renameColumns, getTableInfo(tableName));
Expand All @@ -373,7 +385,7 @@ void testUpdateColumnPosition() {

sql(
String.format(
"CREATE TABLE %s (id STRING COMMENT '', name STRING COMMENT '', age STRING COMMENT '') USING PARQUET",
"CREATE TABLE %s (id STRING COMMENT '', name STRING COMMENT '', age STRING COMMENT '')",
tableName));
checkTableColumns(tableName, simpleTableColumns, getTableInfo(tableName));

Expand Down Expand Up @@ -456,12 +468,13 @@ void testComplexType() {
}

@Test
@EnabledIf("supportsPartition")
void testCreateDatasourceFormatPartitionTable() {
caican00 marked this conversation as resolved.
Show resolved Hide resolved
String tableName = "datasource_partition_table";

dropTableIfExists(tableName);
String createTableSQL = getCreateSimpleTableString(tableName);
createTableSQL = createTableSQL + "USING PARQUET PARTITIONED BY (name, age)";
createTableSQL = createTableSQL + " USING PARQUET PARTITIONED BY (name, age)";
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
Expand Down Expand Up @@ -558,6 +571,7 @@ void testInsertTableAsSelect() {
}

@Test
@EnabledIf("supportsPartition")
void testInsertDatasourceFormatPartitionTableAsSelect() {
String tableName = "insert_select_partition_table";
String newTableName = "new_" + tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.spark.SparkUtilIT;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
import com.google.common.collect.Maps;
import java.io.IOException;
Expand All @@ -37,6 +38,7 @@ public abstract class SparkEnvIT extends SparkUtilIT {
private SparkSession sparkSession;
private String hiveMetastoreUri = "thrift://127.0.0.1:9083";
private String gravitinoUri = "http://127.0.0.1:8090";
private String warehouse;

protected abstract String getCatalogName();

Expand Down Expand Up @@ -79,8 +81,18 @@ private void initMetalakeAndCatalogs() {
client.createMetalake(NameIdentifier.of(metalakeName), "", Collections.emptyMap());
GravitinoMetalake metalake = client.loadMetalake(NameIdentifier.of(metalakeName));
Map<String, String> properties = Maps.newHashMap();
properties.put(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, hiveMetastoreUri);

switch (getProvider()) {
case "hive":
properties.put(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI, hiveMetastoreUri);
break;
case "lakehouse-iceberg":
properties.put(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_BACKEND, "hive");
properties.put(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_WAREHOUSE, warehouse);
properties.put(IcebergPropertiesConstants.LAKEHOUSE_ICEBERG_CATALOG_URI, hiveMetastoreUri);
break;
default:
throw new IllegalArgumentException("Unsupported provider: " + getProvider());
}
metalake.createCatalog(
NameIdentifier.of(metalakeName, getCatalogName()),
Catalog.Type.RELATIONAL,
Expand All @@ -102,6 +114,11 @@ private void initHiveEnv() {
"thrift://%s:%d",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT);
warehouse =
String.format(
"hdfs://%s:%d/user/hive/warehouse",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT);
}

private void initHdfsFileSystem() {
Expand Down Expand Up @@ -129,12 +146,7 @@ private void initSparkEnv() {
.config(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri)
.config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config(
"spark.sql.warehouse.dir",
String.format(
"hdfs://%s:%d/user/hive/warehouse",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT))
.config("spark.sql.warehouse.dir", warehouse)
.enableHiveSupport()
.getOrCreate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ protected boolean supportsSparkSQLClusteredBy() {
return true;
}

@Override
protected boolean supportsPartition() {
return true;
}

@Test
public void testCreateHiveFormatPartitionTable() {
String tableName = "hive_partition_table";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark.iceberg;

import com.datastrato.gravitino.integration.test.spark.SparkCommonIT;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInstance;

@Tag("gravitino-docker-it")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SparkIcebergCatalogIT extends SparkCommonIT {

@Override
protected String getCatalogName() {
return "iceberg";
}

@Override
protected String getProvider() {
return "lakehouse-iceberg";
}

@Override
protected boolean supportsSparkSQLClusteredBy() {
return false;
}

@Override
protected boolean supportsPartition() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.datastrato.gravitino.spark.connector;

import com.datastrato.gravitino.spark.connector.hive.HiveAdaptor;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergAdaptor;
import java.util.Locale;

/**
Expand All @@ -17,6 +18,8 @@ public static GravitinoCatalogAdaptor createGravitinoAdaptor(String provider) {
switch (provider.toLowerCase(Locale.ROOT)) {
case "hive":
return new HiveAdaptor();
case "lakehouse-iceberg":
return new IcebergAdaptor();
default:
throw new RuntimeException(String.format("Provider:%s is not supported yet", provider));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void close() {
Preconditions.checkState(!isClosed, "Gravitino Catalog is already closed");
isClosed = true;
gravitinoClient.close();
gravitinoCatalogManager = null;
}

public Catalog getGravitinoCatalogInfo(String name) {
Expand Down
Loading
Loading