From 9d3978167f8659548d93825e47b95edffcf1819e Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Tue, 8 Apr 2025 15:27:06 -0700 Subject: [PATCH 1/7] add integration test --- .../ext/PolarisSparkIntegrationTestBase.java | 246 ++++++++++++++++++ .../it/test/PolarisSparkIntegrationTest.java | 185 +------------ plugins/pluginlibs.versions.toml | 1 - .../spark/v3.5/integration/build.gradle.kts | 145 +++++++++++ .../spark/quarkus/it/SparkIcebergIT.java | 60 +++++ .../quarkus/it/SparkIntegrationBase.java | 68 +++++ ...olaris.service.it.ext.PolarisServerManager | 20 ++ .../spark/v3.5/{ => spark}/build.gradle.kts | 1 + .../apache/polaris/spark/PolarisCatalog.java | 0 .../polaris/spark/PolarisRESTCatalog.java | 0 .../polaris/spark/PolarisSparkCatalog.java | 0 .../apache/polaris/spark/SparkCatalog.java | 0 .../rest/CreateGenericTableRESTRequest.java | 0 .../rest/LoadGenericTableRESTResponse.java | 0 .../polaris/spark/utils/DeltaHelper.java | 0 .../spark/utils/PolarisCatalogUtils.java | 0 .../polaris/spark/NoopDeltaCatalog.java | 0 .../polaris/spark/PolarisInMemoryCatalog.java | 0 .../polaris/spark/SparkCatalogTest.java | 0 .../spark/rest/DeserializationTest.java | 0 .../quarkus/it/QuarkusServerManager.java | 1 + settings.gradle.kts | 15 +- 22 files changed, 555 insertions(+), 187 deletions(-) create mode 100644 integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java create mode 100644 plugins/spark/v3.5/integration/build.gradle.kts create mode 100644 plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java create mode 100644 plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java create mode 100644 plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager rename plugins/spark/v3.5/{ => spark}/build.gradle.kts (99%) rename plugins/spark/v3.5/{ => spark}/src/main/java/org/apache/polaris/spark/PolarisCatalog.java (100%) rename plugins/spark/v3.5/{ => spark}/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java (100%) rename plugins/spark/v3.5/{ => spark}/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java (100%) rename plugins/spark/v3.5/{ => spark}/src/main/java/org/apache/polaris/spark/SparkCatalog.java (100%) rename plugins/spark/v3.5/{ => spark}/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java (100%) rename plugins/spark/v3.5/{ => spark}/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java (100%) rename plugins/spark/v3.5/{ => spark}/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java (100%) rename plugins/spark/v3.5/{ => spark}/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java (100%) rename plugins/spark/v3.5/{ => spark}/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java (100%) rename plugins/spark/v3.5/{ => spark}/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java (100%) rename plugins/spark/v3.5/{ => spark}/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java (100%) rename plugins/spark/v3.5/{ => spark}/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java (100%) diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java new file mode 100644 index 0000000000..c2ec2021f6 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.it.ext; + +import org.junit.jupiter.api.extension.ExtendWith; +import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.adobe.testing.s3mock.testcontainers.S3MockContainer; +import com.google.common.collect.ImmutableMap; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.core.Response; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Path; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.admin.model.CatalogProperties; +import org.apache.polaris.core.admin.model.ExternalCatalog; +import org.apache.polaris.core.admin.model.PolarisCatalog; +import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.service.it.env.CatalogApi; +import org.apache.polaris.service.it.env.ClientCredentials; +import org.apache.polaris.service.it.env.IntegrationTestsHelper; +import org.apache.polaris.service.it.env.ManagementApi; +import org.apache.polaris.service.it.env.PolarisApiEndpoints; +import org.apache.polaris.service.it.env.PolarisClient; +import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.LoggerFactory; + +@ExtendWith(PolarisIntegrationTestExtension.class) +public abstract class PolarisSparkIntegrationTestBase { + protected static final S3MockContainer s3Container = + new S3MockContainer("3.11.0").withInitialBuckets("my-bucket,my-old-bucket"); + protected static SparkSession spark; + protected PolarisApiEndpoints endpoints; + protected PolarisClient client; + protected ManagementApi managementApi; + protected CatalogApi catalogApi; + protected String sparkToken; + protected String catalogName; + protected String externalCatalogName; + + protected URI warehouseDir; + + @BeforeAll + public static void setup() throws IOException { + s3Container.start(); + } + + @AfterAll + public static void cleanup() { + s3Container.stop(); + } + + @BeforeEach + public void before( + PolarisApiEndpoints apiEndpoints, ClientCredentials credentials, @TempDir Path tempDir) { + endpoints = apiEndpoints; + client = polarisClient(endpoints); + sparkToken = client.obtainToken(credentials); + managementApi = client.managementApi(credentials); + catalogApi = client.catalogApi(credentials); + + warehouseDir = IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve("spark-warehouse"); + + catalogName = client.newEntityName("spark_catalog"); + externalCatalogName = client.newEntityName("spark_ext_catalog"); + + AwsStorageConfigInfo awsConfigModel = + AwsStorageConfigInfo.builder() + .setRoleArn("arn:aws:iam::123456789012:role/my-role") + .setExternalId("externalId") + .setUserArn("userArn") + .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) + .setAllowedLocations(List.of("s3://my-old-bucket/path/to/data")) + .build(); + CatalogProperties props = new CatalogProperties("s3://my-bucket/path/to/data"); + props.putAll( + Map.of( + "table-default.s3.endpoint", + s3Container.getHttpEndpoint(), + "table-default.s3.path-style-access", + "true", + "table-default.s3.access-key-id", + "foo", + "table-default.s3.secret-access-key", + "bar", + "s3.endpoint", + s3Container.getHttpEndpoint(), + "s3.path-style-access", + "true", + "s3.access-key-id", + "foo", + "s3.secret-access-key", + "bar")); + Catalog catalog = + PolarisCatalog.builder() + .setType(Catalog.TypeEnum.INTERNAL) + .setName(catalogName) + .setProperties(props) + .setStorageConfigInfo(awsConfigModel) + .build(); + + managementApi.createCatalog(catalog); + + CatalogProperties externalProps = new CatalogProperties("s3://my-bucket/path/to/data"); + externalProps.putAll( + Map.of( + "table-default.s3.endpoint", + s3Container.getHttpEndpoint(), + "table-default.s3.path-style-access", + "true", + "table-default.s3.access-key-id", + "foo", + "table-default.s3.secret-access-key", + "bar", + "s3.endpoint", + s3Container.getHttpEndpoint(), + "s3.path-style-access", + "true", + "s3.access-key-id", + "foo", + "s3.secret-access-key", + "bar")); + Catalog externalCatalog = + ExternalCatalog.builder() + .setType(Catalog.TypeEnum.EXTERNAL) + .setName(externalCatalogName) + .setProperties(externalProps) + .setStorageConfigInfo(awsConfigModel) + .build(); + + managementApi.createCatalog(externalCatalog); + + SparkSession.Builder sessionBuilder = + SparkSession.builder() + .master("local[1]") + .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config( + "spark.hadoop.fs.s3.aws.credentials.provider", + "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider") + .config("spark.hadoop.fs.s3.access.key", "foo") + .config("spark.hadoop.fs.s3.secret.key", "bar") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.ui.showConsoleProgress", false) + .config("spark.ui.enabled", "false"); + spark = + withCatalog(withCatalog(sessionBuilder, catalogName), externalCatalogName).getOrCreate(); + + onSpark("USE " + catalogName); + } + + protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { + return builder + .config( + String.format("spark.sql.catalog.%s", catalogName), + "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.warehouse.dir", warehouseDir.toString()) + .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") + .config( + String.format("spark.sql.catalog.%s.uri", catalogName), + endpoints.catalogApiEndpoint().toString()) + .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName) + .config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL") + .config( + String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId()) + .config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken) + .config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey") + .config( + String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") + .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); + } + + @AfterEach + public void after() throws Exception { + cleanupCatalog(catalogName); + cleanupCatalog(externalCatalogName); + try { + SparkSession.clearDefaultSession(); + SparkSession.clearActiveSession(); + spark.close(); + } catch (Exception e) { + LoggerFactory.getLogger(getClass()).error("Unable to close spark session", e); + } + + client.close(); + } + + protected void cleanupCatalog(String catalogName) { + onSpark("USE " + catalogName); + List namespaces = onSpark("SHOW NAMESPACES").collectAsList(); + for (Row namespace : namespaces) { + List tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList(); + for (Row table : tables) { + onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1)); + } + List views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList(); + for (Row view : views) { + onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1)); + } + onSpark("DROP NAMESPACE " + namespace.getString(0)); + } + + managementApi.deleteCatalog(catalogName); + } + + protected static Dataset onSpark(@Language("SQL") String sql) { + return spark.sql(sql); + } +} \ No newline at end of file diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java index 10868c1ea9..415df8839a 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java @@ -47,6 +47,7 @@ import org.apache.polaris.service.it.env.PolarisApiEndpoints; import org.apache.polaris.service.it.env.PolarisClient; import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension; +import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -71,185 +72,7 @@ * {@code true} * */ -@ExtendWith(PolarisIntegrationTestExtension.class) -public class PolarisSparkIntegrationTest { - - private static final S3MockContainer s3Container = - new S3MockContainer("3.11.0").withInitialBuckets("my-bucket,my-old-bucket"); - private static SparkSession spark; - private PolarisApiEndpoints endpoints; - private PolarisClient client; - private ManagementApi managementApi; - private CatalogApi catalogApi; - private String sparkToken; - private String catalogName; - private String externalCatalogName; - - private URI warehouseDir; - - @BeforeAll - public static void setup() throws IOException { - s3Container.start(); - } - - @AfterAll - public static void cleanup() { - s3Container.stop(); - } - - @BeforeEach - public void before( - PolarisApiEndpoints apiEndpoints, ClientCredentials credentials, @TempDir Path tempDir) { - endpoints = apiEndpoints; - client = polarisClient(endpoints); - sparkToken = client.obtainToken(credentials); - managementApi = client.managementApi(credentials); - catalogApi = client.catalogApi(credentials); - - warehouseDir = IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve("spark-warehouse"); - - catalogName = client.newEntityName("spark_catalog"); - externalCatalogName = client.newEntityName("spark_ext_catalog"); - - AwsStorageConfigInfo awsConfigModel = - AwsStorageConfigInfo.builder() - .setRoleArn("arn:aws:iam::123456789012:role/my-role") - .setExternalId("externalId") - .setUserArn("userArn") - .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) - .setAllowedLocations(List.of("s3://my-old-bucket/path/to/data")) - .build(); - CatalogProperties props = new CatalogProperties("s3://my-bucket/path/to/data"); - props.putAll( - Map.of( - "table-default.s3.endpoint", - s3Container.getHttpEndpoint(), - "table-default.s3.path-style-access", - "true", - "table-default.s3.access-key-id", - "foo", - "table-default.s3.secret-access-key", - "bar", - "s3.endpoint", - s3Container.getHttpEndpoint(), - "s3.path-style-access", - "true", - "s3.access-key-id", - "foo", - "s3.secret-access-key", - "bar")); - Catalog catalog = - PolarisCatalog.builder() - .setType(Catalog.TypeEnum.INTERNAL) - .setName(catalogName) - .setProperties(props) - .setStorageConfigInfo(awsConfigModel) - .build(); - - managementApi.createCatalog(catalog); - - CatalogProperties externalProps = new CatalogProperties("s3://my-bucket/path/to/data"); - externalProps.putAll( - Map.of( - "table-default.s3.endpoint", - s3Container.getHttpEndpoint(), - "table-default.s3.path-style-access", - "true", - "table-default.s3.access-key-id", - "foo", - "table-default.s3.secret-access-key", - "bar", - "s3.endpoint", - s3Container.getHttpEndpoint(), - "s3.path-style-access", - "true", - "s3.access-key-id", - "foo", - "s3.secret-access-key", - "bar")); - Catalog externalCatalog = - ExternalCatalog.builder() - .setType(Catalog.TypeEnum.EXTERNAL) - .setName(externalCatalogName) - .setProperties(externalProps) - .setStorageConfigInfo(awsConfigModel) - .build(); - - managementApi.createCatalog(externalCatalog); - - SparkSession.Builder sessionBuilder = - SparkSession.builder() - .master("local[1]") - .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - .config( - "spark.hadoop.fs.s3.aws.credentials.provider", - "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider") - .config("spark.hadoop.fs.s3.access.key", "foo") - .config("spark.hadoop.fs.s3.secret.key", "bar") - .config( - "spark.sql.extensions", - "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.ui.showConsoleProgress", false) - .config("spark.ui.enabled", "false"); - spark = - withCatalog(withCatalog(sessionBuilder, catalogName), externalCatalogName).getOrCreate(); - - onSpark("USE " + catalogName); - } - - private SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { - return builder - .config( - String.format("spark.sql.catalog.%s", catalogName), - "org.apache.iceberg.spark.SparkCatalog") - .config("spark.sql.warehouse.dir", warehouseDir.toString()) - .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") - .config( - String.format("spark.sql.catalog.%s.uri", catalogName), - endpoints.catalogApiEndpoint().toString()) - .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName) - .config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL") - .config( - String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId()) - .config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken) - .config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey") - .config( - String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") - .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); - } - - @AfterEach - public void after() throws Exception { - cleanupCatalog(catalogName); - cleanupCatalog(externalCatalogName); - try { - SparkSession.clearDefaultSession(); - SparkSession.clearActiveSession(); - spark.close(); - } catch (Exception e) { - LoggerFactory.getLogger(getClass()).error("Unable to close spark session", e); - } - - client.close(); - } - - private void cleanupCatalog(String catalogName) { - onSpark("USE " + catalogName); - List namespaces = onSpark("SHOW NAMESPACES").collectAsList(); - for (Row namespace : namespaces) { - List tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList(); - for (Row table : tables) { - onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1)); - } - List views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList(); - for (Row view : views) { - onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1)); - } - onSpark("DROP NAMESPACE " + namespace.getString(0)); - } - - managementApi.deleteCatalog(catalogName); - } +public class PolarisSparkIntegrationTest extends PolarisSparkIntegrationTestBase { @Test public void testCreateTable() { @@ -363,8 +186,4 @@ private LoadTableResponse loadTable(String catalog, String namespace, String tab return response.readEntity(LoadTableResponse.class); } } - - private static Dataset onSpark(@Language("SQL") String sql) { - return spark.sql(sql); - } } diff --git a/plugins/pluginlibs.versions.toml b/plugins/pluginlibs.versions.toml index e48f6ef45a..4f7288ff64 100644 --- a/plugins/pluginlibs.versions.toml +++ b/plugins/pluginlibs.versions.toml @@ -22,4 +22,3 @@ iceberg = "1.8.1" spark35 = "3.5.5" scala212 = "2.12.19" scala213 = "2.13.15" - diff --git a/plugins/spark/v3.5/integration/build.gradle.kts b/plugins/spark/v3.5/integration/build.gradle.kts new file mode 100644 index 0000000000..f6fbec508d --- /dev/null +++ b/plugins/spark/v3.5/integration/build.gradle.kts @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + alias(libs.plugins.quarkus) + alias(libs.plugins.jandex) + id("polaris-quarkus") +} + +fun getAndUseScalaVersionForProject(): String { + val sparkScala = project.name.split("-").last().split("_") + + val scalaVersion = sparkScala[1] + + // direct the build to build/ to avoid potential collision problem + project.layout.buildDirectory.set(layout.buildDirectory.dir(scalaVersion).get()) + + return scalaVersion +} + +// get version information +val sparkMajorVersion = "3.5" +val scalaVersion = getAndUseScalaVersionForProject() +val icebergVersion = pluginlibs.versions.iceberg.get() +val spark35Version = pluginlibs.versions.spark35.get() + +dependencies { + // must be enforced to get a consistent and validated set of dependencies + implementation(enforcedPlatform(libs.quarkus.bom)) { + exclude(group = "org.antlr", module = "antlr4-runtime") + exclude(group = "org.scala-lang", module = "scala-library") + exclude(group = "org.scala-lang", module = "scala-reflect") + } + + implementation(project(":polaris-quarkus-service")) + implementation(project(":polaris-api-management-model")) + implementation(project(":polaris-spark-${sparkMajorVersion}_${scalaVersion}")) + + implementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") { + // exclude log4j dependencies + exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") + exclude("org.apache.logging.log4j", "log4j-api") + exclude("org.apache.logging.log4j", "log4j-1.2-api") + exclude("org.slf4j", "jul-to-slf4j") + } + + testImplementation(project(":polaris-tests")) + testImplementation(testFixtures(project(":polaris-quarkus-service"))) + + testImplementation(platform(libs.quarkus.bom)) + testImplementation("io.quarkus:quarkus-junit5") + testImplementation("io.quarkus:quarkus-rest-client") + testImplementation("io.quarkus:quarkus-rest-client-jackson") + + testImplementation(platform(libs.awssdk.bom)) + testImplementation("software.amazon.awssdk:glue") + testImplementation("software.amazon.awssdk:kms") + testImplementation("software.amazon.awssdk:dynamodb") + + testImplementation(platform(libs.testcontainers.bom)) + testImplementation("org.testcontainers:testcontainers") + testImplementation(libs.s3mock.testcontainers) + + // Required for Spark integration tests + testImplementation(enforcedPlatform(libs.scala212.lang.library)) + testImplementation(enforcedPlatform(libs.scala212.lang.reflect)) + testImplementation(libs.javax.servlet.api) + testImplementation(libs.antlr4.runtime) +} + +tasks.named("intTest").configure { + maxParallelForks = 1 + systemProperty("java.util.logging.manager", "org.jboss.logmanager.LogManager") + if (System.getenv("AWS_REGION") == null) { + environment("AWS_REGION", "us-west-2") + } + // Note: the test secrets are referenced in + // org.apache.polaris.service.quarkus.it.QuarkusServerManager + environment("POLARIS_BOOTSTRAP_CREDENTIALS", "POLARIS,test-admin,test-secret") + jvmArgs("--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED") + // Need to allow a java security manager after Java 21, for Subject.getSubject to work + // "getSubject is supported only if a security manager is allowed". + systemProperty("java.security.manager", "allow") + // Same issue as above: allow a java security manager after Java 21 + // (this setting is for the application under test, while the setting above is for test code). + systemProperty("quarkus.test.arg-line", "-Djava.security.manager=allow") + val logsDir = project.layout.buildDirectory.get().asFile.resolve("logs") + // delete files from previous runs + doFirst { + // delete log files written by Polaris + logsDir.deleteRecursively() + // delete quarkus.log file (captured Polaris stdout/stderr) + project.layout.buildDirectory.get().asFile.resolve("quarkus.log").delete() + } + // This property is not honored in a per-profile application.properties file, + // so we need to set it here. + systemProperty("quarkus.log.file.path", logsDir.resolve("polaris.log").absolutePath) + // For Spark integration tests + addSparkJvmOptions() +} + +/** + * Adds the JPMS options required for Spark to run on Java 17, taken from the + * `DEFAULT_MODULE_OPTIONS` constant in `org.apache.spark.launcher.JavaModuleOptions`. + */ +fun JavaForkOptions.addSparkJvmOptions() { + jvmArgs = + (jvmArgs ?: emptyList()) + + listOf( + // Spark 3.3+ + "-XX:+IgnoreUnrecognizedVMOptions", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", + "--add-opens=java.base/java.io=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", + "--add-opens=java.base/sun.security.action=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", + "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED", + // Spark 3.4+ + "-Djdk.reflect.useDirectMethodHandle=false", + ) +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java new file mode 100644 index 0000000000..a2aba78d00 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import org.junit.jupiter.api.Test; + +@QuarkusIntegrationTest +public class SparkIcebergIT extends SparkIntegrationBase { + @Test + public void testNamespaces() { + long namespaceCount = onSpark("SHOW NAMESPACES").count(); + assertThat(namespaceCount).isEqualTo(0L); + + String[] l1NS = new String[] {"l1ns1", "l1ns2"}; + for (String ns : l1NS) { + onSpark(String.format("CREATE NAMESPACE %s", ns)); + } + namespaceCount = onSpark("SHOW NAMESPACES").count(); + assertThat(namespaceCount).isEqualTo(2L); + // String[] l2NS = new String[] {"l2ns1"}; + // onSpark(String.format("CREATE NAMESPACE %s.%s", l1NS[0], "l2ns1")); + // namespaceCount = onSpark("SHOW NAMESPACES").count(); + // assertThat(namespaceCount).isEqualTo(2L); + + // drop the namespace + + } + + @Test + public void testCreatDropView() { + long namespaceCount = onSpark("SHOW NAMESPACES").count(); + assertThat(namespaceCount).isEqualTo(0L); + + String viewName = "test_view"; + onSpark("CREATE NAMESPACE ns1"); + onSpark("USE ns1"); + onSpark(String.format("CREATE VIEW %s AS SELECT 1 AS id", viewName)); + onSpark("SHOW VIEWS"); + onSpark(String.format("DROP VIEW %s", viewName)); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java new file mode 100644 index 0000000000..40bd6cb62f --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import java.util.List; +import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +public class SparkIntegrationBase extends PolarisSparkIntegrationTestBase { + @Override + protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { + return builder + .config( + String.format("spark.sql.catalog.%s", catalogName), + "org.apache.polaris.spark.SparkCatalog") + .config("spark.sql.warehouse.dir", warehouseDir.toString()) + .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") + .config( + String.format("spark.sql.catalog.%s.uri", catalogName), + endpoints.catalogApiEndpoint().toString()) + .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName) + .config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL") + .config( + String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId()) + .config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken) + .config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey") + .config( + String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") + .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); + } + + @Override + protected void cleanupCatalog(String catalogName) { + onSpark("USE " + catalogName); + List namespaces = onSpark("SHOW NAMESPACES").collectAsList(); + for (Row namespace : namespaces) { + // TODO: once all table operations are supported, remove the override of this function + // List tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList(); + // for (Row table : tables) { + // onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1)); + // } + List views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList(); + for (Row view : views) { + onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1)); + } + onSpark("DROP NAMESPACE " + namespace.getString(0)); + } + + managementApi.deleteCatalog(catalogName); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager b/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager new file mode 100644 index 0000000000..a8615d99d4 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.service.quarkus.it.QuarkusServerManager \ No newline at end of file diff --git a/plugins/spark/v3.5/build.gradle.kts b/plugins/spark/v3.5/spark/build.gradle.kts similarity index 99% rename from plugins/spark/v3.5/build.gradle.kts rename to plugins/spark/v3.5/spark/build.gradle.kts index df37fa229e..505d0f49d2 100644 --- a/plugins/spark/v3.5/build.gradle.kts +++ b/plugins/spark/v3.5/spark/build.gradle.kts @@ -141,6 +141,7 @@ tasks.register("createPolarisSparkJar") { mergeServiceFiles() // pack both the source code and dependencies + from(sourceSets.main.get().output) configurations = listOf(project.configurations.runtimeClasspath.get()) diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisCatalog.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java rename to plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java rename to plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java similarity index 100% rename from plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java rename to plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java similarity index 100% rename from plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java rename to plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java diff --git a/quarkus/service/src/testFixtures/java/org/apache/polaris/service/quarkus/it/QuarkusServerManager.java b/quarkus/service/src/testFixtures/java/org/apache/polaris/service/quarkus/it/QuarkusServerManager.java index a9f16c2077..20ae038414 100644 --- a/quarkus/service/src/testFixtures/java/org/apache/polaris/service/quarkus/it/QuarkusServerManager.java +++ b/quarkus/service/src/testFixtures/java/org/apache/polaris/service/quarkus/it/QuarkusServerManager.java @@ -44,6 +44,7 @@ public ClientPrincipal adminCredentials() { return new ClientPrincipal("root", new ClientCredentials("test-admin", "test-secret")); } + @Override public void close() { // Nothing to do diff --git a/settings.gradle.kts b/settings.gradle.kts index 3d658130d8..1de9230438 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -71,12 +71,21 @@ for (sparkVersion in sparkVersions) { val scalaVersions = sparkScalaVersions["scalaVersions"].toString().split(",").map { it.trim() } var first = true for (scalaVersion in scalaVersions) { - val artifactId = "polaris-spark-${sparkVersion}_${scalaVersion}" - polarisProject(artifactId, file("${polarisSparkDir}/v${sparkVersion}")) + val sparkArtifactId = "polaris-spark-${sparkVersion}_${scalaVersion}" + val sparkIntArtifactId = "polaris-spark-integration-${sparkVersion}_${scalaVersion}" + polarisProject( + "polaris-spark-${sparkVersion}_${scalaVersion}", + file("${polarisSparkDir}/v${sparkVersion}/spark"), + ) + polarisProject( + "polaris-spark-integration-${sparkVersion}_${scalaVersion}", + file("${polarisSparkDir}/v${sparkVersion}/integration"), + ) if (first) { first = false } else { - noSourceChecksProjects.add(":$artifactId") + noSourceChecksProjects.add(":$sparkArtifactId") + noSourceChecksProjects.add(":$sparkIntArtifactId") } // Skip all duplicated spark client projects while using Intelij IDE. // This is to avoid problems during dependency analysis and sync when From d285a0a96d31c3b21d3e86ca2244002f6d5f2a38 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Wed, 9 Apr 2025 16:39:27 -0700 Subject: [PATCH 2/7] add change --- build-logic/src/main/kotlin/Utilities.kt | 61 +++++ .../ext/PolarisSparkIntegrationTestBase.java | 13 +- .../it/test/PolarisSparkIntegrationTest.java | 28 -- .../spark/v3.5/integration/build.gradle.kts | 60 ++--- .../quarkus/it/SparkCatalogOperationsIT.java | 245 ++++++++++++++++++ .../quarkus/it/SparkIcebergCatalogIT.java | 109 ++++++++ .../spark/quarkus/it/SparkIcebergIT.java | 60 ----- .../quarkus/it/SparkIntegrationBase.java | 54 +++- plugins/spark/v3.5/spark/build.gradle.kts | 11 - .../quarkus/it/QuarkusServerManager.java | 1 - quarkus/spark-tests/build.gradle.kts | 29 --- 11 files changed, 486 insertions(+), 185 deletions(-) create mode 100644 build-logic/src/main/kotlin/Utilities.kt create mode 100644 plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java create mode 100644 plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergCatalogIT.java delete mode 100644 plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java diff --git a/build-logic/src/main/kotlin/Utilities.kt b/build-logic/src/main/kotlin/Utilities.kt new file mode 100644 index 0000000000..52d0c7c55f --- /dev/null +++ b/build-logic/src/main/kotlin/Utilities.kt @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.gradle.api.Project +import org.gradle.process.JavaForkOptions + +fun Project.getAndUseScalaVersionForProject(): String { + val sparkScala = project.name.split("-").last().split("_") + + val scalaVersion = sparkScala[1] + + // direct the build to build/ to avoid potential collision problem + project.layout.buildDirectory.set(layout.buildDirectory.dir(scalaVersion).get()) + + return scalaVersion +} + +/** + * Adds the JPMS options required for Spark to run on Java 17, taken from the + * `DEFAULT_MODULE_OPTIONS` constant in `org.apache.spark.launcher.JavaModuleOptions`. + */ +fun JavaForkOptions.addSparkJvmOptions() { + jvmArgs = + (jvmArgs ?: emptyList()) + + listOf( + // Spark 3.3+ + "-XX:+IgnoreUnrecognizedVMOptions", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", + "--add-opens=java.base/java.io=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", + "--add-opens=java.base/sun.security.action=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", + "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED", + // Spark 3.4+ + "-Djdk.reflect.useDirectMethodHandle=false", + ) +} diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java index c2ec2021f6..670d39a7da 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java @@ -18,23 +18,14 @@ */ package org.apache.polaris.service.it.ext; -import org.junit.jupiter.api.extension.ExtendWith; import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.adobe.testing.s3mock.testcontainers.S3MockContainer; -import com.google.common.collect.ImmutableMap; -import jakarta.ws.rs.client.Entity; -import jakarta.ws.rs.core.Response; import java.io.IOException; import java.net.URI; import java.nio.file.Path; -import java.time.Instant; import java.util.List; import java.util.Map; -import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest; -import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; import org.apache.polaris.core.admin.model.Catalog; import org.apache.polaris.core.admin.model.CatalogProperties; @@ -47,7 +38,6 @@ import org.apache.polaris.service.it.env.ManagementApi; import org.apache.polaris.service.it.env.PolarisApiEndpoints; import org.apache.polaris.service.it.env.PolarisClient; -import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -56,7 +46,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import org.slf4j.LoggerFactory; @@ -243,4 +232,4 @@ protected void cleanupCatalog(String catalogName) { protected static Dataset onSpark(@Language("SQL") String sql) { return spark.sql(sql); } -} \ No newline at end of file +} diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java index 415df8839a..2bdd109281 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java @@ -18,48 +18,20 @@ */ package org.apache.polaris.service.it.test; -import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import com.adobe.testing.s3mock.testcontainers.S3MockContainer; import com.google.common.collect.ImmutableMap; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.Response; -import java.io.IOException; -import java.net.URI; -import java.nio.file.Path; import java.time.Instant; import java.util.List; import java.util.Map; import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest; import org.apache.iceberg.rest.responses.LoadTableResponse; -import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; -import org.apache.polaris.core.admin.model.Catalog; -import org.apache.polaris.core.admin.model.CatalogProperties; -import org.apache.polaris.core.admin.model.ExternalCatalog; -import org.apache.polaris.core.admin.model.PolarisCatalog; -import org.apache.polaris.core.admin.model.StorageConfigInfo; -import org.apache.polaris.service.it.env.CatalogApi; -import org.apache.polaris.service.it.env.ClientCredentials; -import org.apache.polaris.service.it.env.IntegrationTestsHelper; -import org.apache.polaris.service.it.env.ManagementApi; -import org.apache.polaris.service.it.env.PolarisApiEndpoints; -import org.apache.polaris.service.it.env.PolarisClient; -import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension; import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.intellij.lang.annotations.Language; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; -import org.slf4j.LoggerFactory; /** * @implSpec This test expects the server to be configured with the following features enabled: diff --git a/plugins/spark/v3.5/integration/build.gradle.kts b/plugins/spark/v3.5/integration/build.gradle.kts index f6fbec508d..e4611feb37 100644 --- a/plugins/spark/v3.5/integration/build.gradle.kts +++ b/plugins/spark/v3.5/integration/build.gradle.kts @@ -23,22 +23,17 @@ plugins { id("polaris-quarkus") } -fun getAndUseScalaVersionForProject(): String { - val sparkScala = project.name.split("-").last().split("_") - - val scalaVersion = sparkScala[1] - - // direct the build to build/ to avoid potential collision problem - project.layout.buildDirectory.set(layout.buildDirectory.dir(scalaVersion).get()) - - return scalaVersion -} - // get version information val sparkMajorVersion = "3.5" val scalaVersion = getAndUseScalaVersionForProject() val icebergVersion = pluginlibs.versions.iceberg.get() val spark35Version = pluginlibs.versions.spark35.get() +val scalaLibraryVersion = + if (scalaVersion == "2.12") { + pluginlibs.versions.scala212.get() + } else { + pluginlibs.versions.scala213.get() + } dependencies { // must be enforced to get a consistent and validated set of dependencies @@ -60,7 +55,15 @@ dependencies { exclude("org.slf4j", "jul-to-slf4j") } - testImplementation(project(":polaris-tests")) + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-annotations") + implementation("com.fasterxml.jackson.core:jackson-core") + implementation("com.fasterxml.jackson.core:jackson-databind") + + implementation( + "org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}" + ) + testImplementation(testFixtures(project(":polaris-quarkus-service"))) testImplementation(platform(libs.quarkus.bom)) @@ -78,8 +81,8 @@ dependencies { testImplementation(libs.s3mock.testcontainers) // Required for Spark integration tests - testImplementation(enforcedPlatform(libs.scala212.lang.library)) - testImplementation(enforcedPlatform(libs.scala212.lang.reflect)) + testImplementation(enforcedPlatform("org.scala-lang:scala-library:${scalaLibraryVersion}")) + testImplementation(enforcedPlatform("org.scala-lang:scala-reflect:${scalaLibraryVersion}")) testImplementation(libs.javax.servlet.api) testImplementation(libs.antlr4.runtime) } @@ -114,32 +117,3 @@ tasks.named("intTest").configure { // For Spark integration tests addSparkJvmOptions() } - -/** - * Adds the JPMS options required for Spark to run on Java 17, taken from the - * `DEFAULT_MODULE_OPTIONS` constant in `org.apache.spark.launcher.JavaModuleOptions`. - */ -fun JavaForkOptions.addSparkJvmOptions() { - jvmArgs = - (jvmArgs ?: emptyList()) + - listOf( - // Spark 3.3+ - "-XX:+IgnoreUnrecognizedVMOptions", - "--add-opens=java.base/java.lang=ALL-UNNAMED", - "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", - "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", - "--add-opens=java.base/java.io=ALL-UNNAMED", - "--add-opens=java.base/java.net=ALL-UNNAMED", - "--add-opens=java.base/java.nio=ALL-UNNAMED", - "--add-opens=java.base/java.util=ALL-UNNAMED", - "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", - "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", - "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", - "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", - "--add-opens=java.base/sun.security.action=ALL-UNNAMED", - "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", - "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED", - // Spark 3.4+ - "-Djdk.reflect.useDirectMethodHandle=false", - ) -} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java new file mode 100644 index 0000000000..f60d5d757a --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.google.common.collect.Maps; +import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.util.Arrays; +import java.util.Map; +import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.polaris.spark.SparkCatalog; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.NamespaceChange; +import org.apache.spark.sql.connector.catalog.View; +import org.apache.spark.sql.connector.catalog.ViewChange; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; + +@QuarkusIntegrationTest +public class SparkCatalogOperationsIT extends SparkIntegrationBase { + private static StructType schema = new StructType().add("id", "long").add("name", "string"); + + @Test + void testNamespaceOperations() throws Exception { + SparkCatalog catalog = loadSparkCatalog(); + + String[][] lv1ns = new String[][] {{"l1ns1"}, {"l1ns2"}}; + String[][] lv2ns1 = new String[][] {{"l1ns1", "l2ns1"}, {"l1ns1", "l2ns2"}}; + String[][] lv2ns2 = new String[][] {{"l1ns2", "l2ns3"}}; + + // create the namespaces + for (String[] namespace : lv1ns) { + catalog.createNamespace(namespace, Maps.newHashMap()); + } + for (String[] namespace : lv2ns1) { + catalog.createNamespace(namespace, Maps.newHashMap()); + } + for (String[] namespace : lv2ns2) { + catalog.createNamespace(namespace, Maps.newHashMap()); + } + + // list namespaces under root + String[][] lv1nsResult = catalog.listNamespaces(); + assertThat(lv1nsResult.length).isEqualTo(lv1ns.length); + for (String[] namespace : lv1ns) { + assertThat(Arrays.asList(lv1nsResult)).contains(namespace); + } + // list namespace under l1ns1 + String[][] lv2ns1Result = catalog.listNamespaces(lv1ns[0]); + assertThat(lv2ns1Result.length).isEqualTo(lv2ns1.length); + for (String[] namespace : lv2ns1) { + assertThat(Arrays.asList(lv2ns1Result)).contains(namespace); + } + // list namespace under l1ns2 + String[][] lv2ns2Result = catalog.listNamespaces(lv1ns[1]); + assertThat(lv2ns2Result.length).isEqualTo(lv2ns2.length); + for (String[] namespace : lv2ns2) { + assertThat(Arrays.asList(lv2ns2Result)).contains(namespace); + } + // no namespace under l1ns2.l2ns3 + assertThat(catalog.listNamespaces(lv2ns2[0]).length).isEqualTo(0); + + // drop the nested namespace under lv1ns[1] + catalog.dropNamespace(lv2ns2[0], true); + assertThat(catalog.listNamespaces(lv1ns[1]).length).isEqualTo(0); + catalog.dropNamespace(lv1ns[1], true); + assertThatThrownBy(() -> catalog.listNamespaces(lv1ns[1])) + .isInstanceOf(NoSuchNamespaceException.class); + + // directly drop lv1ns[0] should fail + assertThatThrownBy(() -> catalog.dropNamespace(lv1ns[0], true)) + .isInstanceOf(BadRequestException.class); + for (String[] namespace : lv2ns1) { + catalog.dropNamespace(namespace, true); + } + catalog.dropNamespace(lv1ns[0], true); + + // no more namespace available + assertThat(catalog.listNamespaces().length).isEqualTo(0); + } + + @Test + void testAlterNamespace() throws Exception { + SparkCatalog catalog = loadSparkCatalog(); + String[] namespace = new String[] {"ns1"}; + Map metadata = Maps.newHashMap(); + metadata.put("owner", "user1"); + + catalog.createNamespace(namespace, metadata); + assertThat(catalog.loadNamespaceMetadata(namespace)).contains(Map.entry("owner", "user1")); + + catalog.alterNamespace(namespace, NamespaceChange.setProperty("owner", "new-user")); + assertThat(catalog.loadNamespaceMetadata(namespace)).contains(Map.entry("owner", "new-user")); + + // drop the namespace + catalog.dropNamespace(namespace, true); + } + + @Test + void testBasicViewOperations() throws Exception { + SparkCatalog catalog = loadSparkCatalog(); + String[] namespace = new String[] {"ns"}; + catalog.createNamespace(namespace, Maps.newHashMap()); + + Identifier viewIdentifier = Identifier.of(namespace, "test-view"); + String viewSql = "select id from test-table where id < 3"; + catalog.createView( + viewIdentifier, + viewSql, + catalogName, + namespace, + schema, + new String[0], + new String[0], + new String[0], + Maps.newHashMap()); + + // load the view + View view = catalog.loadView(viewIdentifier); + assertThat(view.query()).isEqualTo(viewSql); + assertThat(view.schema()).isEqualTo(schema); + + // alter the view properties + catalog.alterView(viewIdentifier, ViewChange.setProperty("owner", "user1")); + view = catalog.loadView(viewIdentifier); + assertThat(view.properties()).contains(Map.entry("owner", "user1")); + + // rename the view + Identifier newIdentifier = Identifier.of(namespace, "new-view"); + catalog.renameView(viewIdentifier, newIdentifier); + assertThatThrownBy(() -> catalog.loadView(viewIdentifier)) + .isInstanceOf(NoSuchViewException.class); + view = catalog.loadView(newIdentifier); + assertThat(view.query()).isEqualTo(viewSql); + assertThat(view.schema()).isEqualTo(schema); + + // replace the view + String newSql = "select id from test-table where id == 3"; + Map properties = Maps.newHashMap(); + properties.put("owner", "test-user"); + catalog.replaceView( + newIdentifier, + newSql, + catalogName, + namespace, + schema, + new String[0], + new String[0], + new String[0], + properties); + view = catalog.loadView(newIdentifier); + assertThat(view.query()).isEqualTo(newSql); + assertThat(view.properties()).contains(Map.entry("owner", "test-user")); + + // drop the view + catalog.dropView(newIdentifier); + assertThatThrownBy(() -> catalog.loadView(newIdentifier)) + .isInstanceOf(NoSuchViewException.class); + } + + @Test + void testListViews() throws Exception { + SparkCatalog catalog = loadSparkCatalog(); + + String[] l1ns = new String[] {"ns"}; + catalog.createNamespace(l1ns, Maps.newHashMap()); + + // create a new namespace under the default NS + String[] l2ns = new String[] {"ns", "nsl2"}; + catalog.createNamespace(l2ns, Maps.newHashMap()); + // create one view under l1 + String view1Name = "test-view1"; + String view1SQL = "select id from test-table where id >= 3"; + catalog.createView( + Identifier.of(l1ns, view1Name), + view1SQL, + catalogName, + l1ns, + schema, + new String[0], + new String[0], + new String[0], + Maps.newHashMap()); + // create two views under the l2 namespace + String[] nsl2ViewNames = new String[] {"test-view2", "test-view3"}; + String[] nsl2ViewSQLs = + new String[] { + "select id from test-table where id == 3", "select id from test-table where id < 3" + }; + for (int i = 0; i < nsl2ViewNames.length; i++) { + catalog.createView( + Identifier.of(l2ns, nsl2ViewNames[i]), + nsl2ViewSQLs[i], + catalogName, + l2ns, + schema, + new String[0], + new String[0], + new String[0], + Maps.newHashMap()); + } + // list views under l1ns + Identifier[] l1Views = catalog.listViews(l1ns); + assertThat(l1Views.length).isEqualTo(1); + assertThat(l1Views[0].name()).isEqualTo(view1Name); + + // list views under l2ns + Identifier[] l2Views = catalog.listViews(l2ns); + assertThat(l2Views.length).isEqualTo(nsl2ViewSQLs.length); + for (String name : nsl2ViewNames) { + assertThat(Arrays.asList(l2Views)).contains(Identifier.of(l2ns, name)); + } + + // drop namespace fails since there are views under it + assertThatThrownBy(() -> catalog.dropNamespace(l2ns, true)) + .isInstanceOf(BadRequestException.class); + // drop the views + for (String name : nsl2ViewNames) { + catalog.dropView(Identifier.of(l2ns, name)); + } + catalog.dropNamespace(l2ns, true); + catalog.dropView(Identifier.of(l1ns, view1Name)); + catalog.dropNamespace(l1ns, true); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergCatalogIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergCatalogIT.java new file mode 100644 index 0000000000..356e57a81a --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergCatalogIT.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.util.List; +import org.junit.jupiter.api.Test; + +@QuarkusIntegrationTest +public class SparkIcebergCatalogIT extends SparkIntegrationBase { + @Test + public void testNamespaces() { + List namespaces = sql("SHOW NAMESPACES"); + assertThat(namespaces.size()).isEqualTo(0); + + String[] l1NS = new String[] {"l1ns1", "l1ns2"}; + for (String ns : l1NS) { + sql("CREATE NAMESPACE %s", ns); + } + namespaces = sql("SHOW NAMESPACES"); + assertThat(namespaces.size()).isEqualTo(2); + for (String ns : l1NS) { + assertThat(namespaces).contains(new Object[] {ns}); + } + String l2ns = "l2ns"; + // create a nested namespace + sql("CREATE NAMESPACE %s.%s", l1NS[0], l2ns); + // spark show namespace only shows + namespaces = sql("SHOW NAMESPACES"); + assertThat(namespaces.size()).isEqualTo(2); + + // can not drop l1NS before the nested namespace is dropped + assertThatThrownBy(() -> sql("DROP NAMESPACE %s", l1NS[0])) + .hasMessageContaining(String.format("Namespace %s is not empty", l1NS[0])); + sql("DROP NAMESPACE %s.%s", l1NS[0], l2ns); + + for (String ns : l1NS) { + sql("DROP NAMESPACE %s", ns); + } + + // no namespace available after all drop + namespaces = sql("SHOW NAMESPACES"); + assertThat(namespaces.size()).isEqualTo(0); + } + + @Test + public void testCreatDropView() { + String namespace = "ns"; + // create namespace ns + sql("CREATE NAMESPACE %s", namespace); + sql("USE %s", namespace); + + // create two views under the namespace + String view1Name = "testView1"; + String view2Name = "testView2"; + sql("CREATE VIEW %s AS SELECT 1 AS id", view1Name); + sql("CREATE VIEW %s AS SELECT 10 AS id", view2Name); + List views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(2); + assertThat(views).contains(new Object[] {namespace, view1Name, false}); + assertThat(views).contains(new Object[] {namespace, view2Name, false}); + + // drop the views + sql("DROP VIEW %s", view1Name); + views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(1); + assertThat(views).contains(new Object[] {namespace, view2Name, false}); + + sql("DROP VIEW %s", view2Name); + views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(0); + } + + @Test + public void renameView() { + sql("CREATE NAMESPACE ns"); + sql("USE ns"); + + String viewName = "originalView"; + String renamedView = "renamedView"; + sql("CREATE VIEW %s AS SELECT 1 AS id", viewName); + List views = sql("SHOW VIEWS"); + assertThat(views).contains(new Object[] {"ns", viewName, false}); + + // TODO: update this test once table related operations are supported + // spark view rename triggers a loadTable underneath + assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView)) + .isInstanceOf(UnsupportedOperationException.class); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java deleted file mode 100644 index a2aba78d00..0000000000 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.spark.quarkus.it; - -import static org.assertj.core.api.Assertions.assertThat; - -import io.quarkus.test.junit.QuarkusIntegrationTest; -import org.junit.jupiter.api.Test; - -@QuarkusIntegrationTest -public class SparkIcebergIT extends SparkIntegrationBase { - @Test - public void testNamespaces() { - long namespaceCount = onSpark("SHOW NAMESPACES").count(); - assertThat(namespaceCount).isEqualTo(0L); - - String[] l1NS = new String[] {"l1ns1", "l1ns2"}; - for (String ns : l1NS) { - onSpark(String.format("CREATE NAMESPACE %s", ns)); - } - namespaceCount = onSpark("SHOW NAMESPACES").count(); - assertThat(namespaceCount).isEqualTo(2L); - // String[] l2NS = new String[] {"l2ns1"}; - // onSpark(String.format("CREATE NAMESPACE %s.%s", l1NS[0], "l2ns1")); - // namespaceCount = onSpark("SHOW NAMESPACES").count(); - // assertThat(namespaceCount).isEqualTo(2L); - - // drop the namespace - - } - - @Test - public void testCreatDropView() { - long namespaceCount = onSpark("SHOW NAMESPACES").count(); - assertThat(namespaceCount).isEqualTo(0L); - - String viewName = "test_view"; - onSpark("CREATE NAMESPACE ns1"); - onSpark("USE ns1"); - onSpark(String.format("CREATE VIEW %s AS SELECT 1 AS id", viewName)); - onSpark("SHOW VIEWS"); - onSpark(String.format("DROP VIEW %s", viewName)); - } -} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java index 40bd6cb62f..ae4ebe53b8 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java @@ -18,12 +18,20 @@ */ package org.apache.polaris.spark.quarkus.it; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.FormatMethod; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase; +import org.apache.polaris.spark.SparkCatalog; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; + +public abstract class SparkIntegrationBase extends PolarisSparkIntegrationTestBase { -public class SparkIntegrationBase extends PolarisSparkIntegrationTestBase { @Override protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { return builder @@ -65,4 +73,48 @@ protected void cleanupCatalog(String catalogName) { managementApi.deleteCatalog(catalogName); } + + @FormatMethod + protected List sql(String query, Object... args) { + List rows = spark.sql(String.format(query, args)).collectAsList(); + if (rows.isEmpty()) { + return ImmutableList.of(); + } + return rowsToJava(rows); + } + + protected List rowsToJava(List rows) { + return rows.stream().map(this::toJava).collect(Collectors.toList()); + } + + private Object[] toJava(Row row) { + return IntStream.range(0, row.size()) + .mapToObj( + pos -> { + if (row.isNullAt(pos)) { + return null; + } + + Object value = row.get(pos); + if (value instanceof Row) { + return toJava((Row) value); + } else if (value instanceof scala.collection.Seq) { + return row.getList(pos); + } else if (value instanceof scala.collection.Map) { + return row.getJavaMap(pos); + } else { + return value; + } + }) + .toArray(Object[]::new); + } + + protected SparkCatalog loadSparkCatalog() { + Preconditions.checkArgument(spark != null, "No active spark found"); + Preconditions.checkArgument(catalogName != null, "No catalogName found"); + CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName); + Preconditions.checkArgument( + catalogPlugin instanceof SparkCatalog, "The Spark Catalog is not a Polaris SparkCatalog"); + return (SparkCatalog) catalogPlugin; + } } diff --git a/plugins/spark/v3.5/spark/build.gradle.kts b/plugins/spark/v3.5/spark/build.gradle.kts index 505d0f49d2..ddf27ce1f9 100644 --- a/plugins/spark/v3.5/spark/build.gradle.kts +++ b/plugins/spark/v3.5/spark/build.gradle.kts @@ -24,17 +24,6 @@ plugins { alias(libs.plugins.jandex) } -fun getAndUseScalaVersionForProject(): String { - val sparkScala = project.name.split("-").last().split("_") - - val scalaVersion = sparkScala[1] - - // direct the build to build/ to avoid potential collision problem - project.layout.buildDirectory.set(layout.buildDirectory.dir(scalaVersion).get()) - - return scalaVersion -} - // get version information val sparkMajorVersion = "3.5" val scalaVersion = getAndUseScalaVersionForProject() diff --git a/quarkus/service/src/testFixtures/java/org/apache/polaris/service/quarkus/it/QuarkusServerManager.java b/quarkus/service/src/testFixtures/java/org/apache/polaris/service/quarkus/it/QuarkusServerManager.java index 20ae038414..a9f16c2077 100644 --- a/quarkus/service/src/testFixtures/java/org/apache/polaris/service/quarkus/it/QuarkusServerManager.java +++ b/quarkus/service/src/testFixtures/java/org/apache/polaris/service/quarkus/it/QuarkusServerManager.java @@ -44,7 +44,6 @@ public ClientPrincipal adminCredentials() { return new ClientPrincipal("root", new ClientCredentials("test-admin", "test-secret")); } - @Override public void close() { // Nothing to do diff --git a/quarkus/spark-tests/build.gradle.kts b/quarkus/spark-tests/build.gradle.kts index 18cc9d585f..0e83a7f279 100644 --- a/quarkus/spark-tests/build.gradle.kts +++ b/quarkus/spark-tests/build.gradle.kts @@ -88,32 +88,3 @@ tasks.named("intTest").configure { // For Spark integration tests addSparkJvmOptions() } - -/** - * Adds the JPMS options required for Spark to run on Java 17, taken from the - * `DEFAULT_MODULE_OPTIONS` constant in `org.apache.spark.launcher.JavaModuleOptions`. - */ -fun JavaForkOptions.addSparkJvmOptions() { - jvmArgs = - (jvmArgs ?: emptyList()) + - listOf( - // Spark 3.3+ - "-XX:+IgnoreUnrecognizedVMOptions", - "--add-opens=java.base/java.lang=ALL-UNNAMED", - "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", - "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", - "--add-opens=java.base/java.io=ALL-UNNAMED", - "--add-opens=java.base/java.net=ALL-UNNAMED", - "--add-opens=java.base/java.nio=ALL-UNNAMED", - "--add-opens=java.base/java.util=ALL-UNNAMED", - "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", - "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", - "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", - "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", - "--add-opens=java.base/sun.security.action=ALL-UNNAMED", - "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", - "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED", - // Spark 3.4+ - "-Djdk.reflect.useDirectMethodHandle=false", - ) -} From 828730af4b9457a8098e385d4db5a798ae7bf69b Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Wed, 9 Apr 2025 17:00:56 -0700 Subject: [PATCH 3/7] add comments --- build-logic/src/main/kotlin/Utilities.kt | 5 +++++ .../polaris/spark/quarkus/it/SparkCatalogOperationsIT.java | 5 +++++ .../org.apache.polaris.service.it.ext.PolarisServerManager | 2 +- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/build-logic/src/main/kotlin/Utilities.kt b/build-logic/src/main/kotlin/Utilities.kt index 52d0c7c55f..6c235bf398 100644 --- a/build-logic/src/main/kotlin/Utilities.kt +++ b/build-logic/src/main/kotlin/Utilities.kt @@ -20,6 +20,11 @@ import org.gradle.api.Project import org.gradle.process.JavaForkOptions +/** + * Extract the scala version from polaris spark project, and points the build directory to a sub-dir + * that uses scala version as name. The polaris spark project name is in format of + * -_, for example: polaris-spark-3.5_2.12. + */ fun Project.getAndUseScalaVersionForProject(): String { val sparkScala = project.name.split("-").last().split("_") diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java index f60d5d757a..ed6f798af2 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java @@ -38,6 +38,11 @@ @QuarkusIntegrationTest public class SparkCatalogOperationsIT extends SparkIntegrationBase { + /** + * This integration directly performs operations using the SparkCatalog instance, instead of going + * through Spark SQL interface. Some operations don't have an easy way to trigger through the + * SparkSQL, for example, list nested namespaces. + */ private static StructType schema = new StructType().add("id", "long").add("name", "string"); @Test diff --git a/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager b/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager index a8615d99d4..3c3881857b 100644 --- a/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager +++ b/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager @@ -17,4 +17,4 @@ # under the License. # -org.apache.polaris.service.quarkus.it.QuarkusServerManager \ No newline at end of file +org.apache.polaris.service.quarkus.it.QuarkusServerManager From 9d454304d01e72391c1c8e46931ac753307fa76c Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Fri, 11 Apr 2025 18:15:42 -0700 Subject: [PATCH 4/7] rebase main --- ...gOperationsIT.java => PolarisSparkCatalogIT.java} | 12 ++++++------ ...parkIcebergCatalogIT.java => SparkIcebergIT.java} | 11 ++++++----- .../spark/quarkus/it/SparkIntegrationBase.java | 3 +++ settings.gradle.kts | 8 ++++---- 4 files changed, 19 insertions(+), 15 deletions(-) rename plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/{SparkCatalogOperationsIT.java => PolarisSparkCatalogIT.java} (96%) rename plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/{SparkIcebergCatalogIT.java => SparkIcebergIT.java} (91%) diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/PolarisSparkCatalogIT.java similarity index 96% rename from plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java rename to plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/PolarisSparkCatalogIT.java index ed6f798af2..a2ba89c984 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogOperationsIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/PolarisSparkCatalogIT.java @@ -36,13 +36,13 @@ import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +/** + * This integration directly performs operations using the SparkCatalog instance, instead of going + * through Spark SQL interface. Some operations don't have an easy way to trigger through the + * SparkSQL, for example, list nested namespaces. + */ @QuarkusIntegrationTest -public class SparkCatalogOperationsIT extends SparkIntegrationBase { - /** - * This integration directly performs operations using the SparkCatalog instance, instead of going - * through Spark SQL interface. Some operations don't have an easy way to trigger through the - * SparkSQL, for example, list nested namespaces. - */ +public class PolarisSparkCatalogIT extends SparkIntegrationBase { private static StructType schema = new StructType().add("id", "long").add("name", "string"); @Test diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergCatalogIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java similarity index 91% rename from plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergCatalogIT.java rename to plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java index 356e57a81a..d5bf98f4ee 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergCatalogIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test; @QuarkusIntegrationTest -public class SparkIcebergCatalogIT extends SparkIntegrationBase { +public class SparkIcebergIT extends SparkIntegrationBase { @Test public void testNamespaces() { List namespaces = sql("SHOW NAMESPACES"); @@ -99,11 +99,12 @@ public void renameView() { String renamedView = "renamedView"; sql("CREATE VIEW %s AS SELECT 1 AS id", viewName); List views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(1); assertThat(views).contains(new Object[] {"ns", viewName, false}); - // TODO: update this test once table related operations are supported - // spark view rename triggers a loadTable underneath - assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView)) - .isInstanceOf(UnsupportedOperationException.class); + sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); + views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(1); + assertThat(views).contains(new Object[] {"ns", renamedView, false}); } } diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java index ae4ebe53b8..581325f515 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java @@ -38,6 +38,9 @@ protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String .config( String.format("spark.sql.catalog.%s", catalogName), "org.apache.polaris.spark.SparkCatalog") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.warehouse.dir", warehouseDir.toString()) .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") .config( diff --git a/settings.gradle.kts b/settings.gradle.kts index 1de9230438..cbfbc3a269 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -74,12 +74,12 @@ for (sparkVersion in sparkVersions) { val sparkArtifactId = "polaris-spark-${sparkVersion}_${scalaVersion}" val sparkIntArtifactId = "polaris-spark-integration-${sparkVersion}_${scalaVersion}" polarisProject( - "polaris-spark-${sparkVersion}_${scalaVersion}", - file("${polarisSparkDir}/v${sparkVersion}/spark"), + "polaris-spark-${sparkVersion}_${scalaVersion}", + file("${polarisSparkDir}/v${sparkVersion}/spark"), ) polarisProject( - "polaris-spark-integration-${sparkVersion}_${scalaVersion}", - file("${polarisSparkDir}/v${sparkVersion}/integration"), + "polaris-spark-integration-${sparkVersion}_${scalaVersion}", + file("${polarisSparkDir}/v${sparkVersion}/integration"), ) if (first) { first = false From f518f4b0e525845168e4bcda0b69dc5bf871c708 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Fri, 11 Apr 2025 18:38:17 -0700 Subject: [PATCH 5/7] update class comments --- .../{PolarisSparkCatalogIT.java => SparkCatalogIT.java} | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) rename plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/{PolarisSparkCatalogIT.java => SparkCatalogIT.java} (95%) diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/PolarisSparkCatalogIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIT.java similarity index 95% rename from plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/PolarisSparkCatalogIT.java rename to plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIT.java index a2ba89c984..7beb71728c 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/PolarisSparkCatalogIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIT.java @@ -37,12 +37,13 @@ import org.junit.jupiter.api.Test; /** - * This integration directly performs operations using the SparkCatalog instance, instead of going - * through Spark SQL interface. Some operations don't have an easy way to trigger through the - * SparkSQL, for example, list nested namespaces. + * This integration directly performs operations using the Polaris SparkCatalog instance, instead of + * going through Spark SQL interface. This provides a more direct testing capability against the + * Polaris SparkCatalog operations, some operations like listNamespaces under a namespace can not be + * triggered through a SQL interface directly with Spark. */ @QuarkusIntegrationTest -public class PolarisSparkCatalogIT extends SparkIntegrationBase { +public class SparkCatalogIT extends SparkIntegrationBase { private static StructType schema = new StructType().add("id", "long").add("name", "string"); @Test From 4954233c8ffbbeb058d2f5e59b71e1cc9fbe4a42 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Mon, 14 Apr 2025 17:56:52 -0700 Subject: [PATCH 6/7] add base integration --- ...CatalogIT.java => SparkCatalogBaseIT.java} | 120 ++++++++++-------- .../quarkus/it/SparkCatalogIcebergIT.java | 54 ++++++++ .../quarkus/it/SparkCatalogPolarisIT.java | 30 +++++ .../it/{SparkIcebergIT.java => SparkIT.java} | 2 +- .../quarkus/it/SparkIntegrationBase.java | 12 -- 5 files changed, 150 insertions(+), 68 deletions(-) rename plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/{SparkCatalogIT.java => SparkCatalogBaseIT.java} (64%) create mode 100644 plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java create mode 100644 plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java rename plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/{SparkIcebergIT.java => SparkIT.java} (98%) diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java similarity index 64% rename from plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIT.java rename to plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java index 7beb71728c..0b6ae70543 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java @@ -21,19 +21,18 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import io.quarkus.test.junit.QuarkusIntegrationTest; import java.util.Arrays; import java.util.Map; import org.apache.iceberg.exceptions.BadRequestException; -import org.apache.polaris.spark.SparkCatalog; +import org.apache.iceberg.spark.SupportsReplaceView; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.NamespaceChange; -import org.apache.spark.sql.connector.catalog.View; -import org.apache.spark.sql.connector.catalog.ViewChange; +import org.apache.spark.sql.connector.catalog.*; import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** @@ -43,94 +42,107 @@ * triggered through a SQL interface directly with Spark. */ @QuarkusIntegrationTest -public class SparkCatalogIT extends SparkIntegrationBase { +public abstract class SparkCatalogBaseIT extends SparkIntegrationBase { private static StructType schema = new StructType().add("id", "long").add("name", "string"); + protected StagingTableCatalog tableCatalog = null; + protected SupportsNamespaces namespaceCatalog = null; + protected ViewCatalog viewCatalog = null; + protected SupportsReplaceView replaceViewCatalog = null; + + @BeforeEach + protected void loadCatalogs() { + Preconditions.checkArgument(spark != null, "No active spark found"); + Preconditions.checkArgument(catalogName != null, "No catalogName found"); + CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName); + tableCatalog = (StagingTableCatalog) catalogPlugin; + namespaceCatalog = (SupportsNamespaces) catalogPlugin; + viewCatalog = (ViewCatalog) catalogPlugin; + replaceViewCatalog = (SupportsReplaceView) catalogPlugin; + } @Test void testNamespaceOperations() throws Exception { - SparkCatalog catalog = loadSparkCatalog(); - String[][] lv1ns = new String[][] {{"l1ns1"}, {"l1ns2"}}; String[][] lv2ns1 = new String[][] {{"l1ns1", "l2ns1"}, {"l1ns1", "l2ns2"}}; String[][] lv2ns2 = new String[][] {{"l1ns2", "l2ns3"}}; // create the namespaces for (String[] namespace : lv1ns) { - catalog.createNamespace(namespace, Maps.newHashMap()); + namespaceCatalog.createNamespace(namespace, Maps.newHashMap()); } for (String[] namespace : lv2ns1) { - catalog.createNamespace(namespace, Maps.newHashMap()); + namespaceCatalog.createNamespace(namespace, Maps.newHashMap()); } for (String[] namespace : lv2ns2) { - catalog.createNamespace(namespace, Maps.newHashMap()); + namespaceCatalog.createNamespace(namespace, Maps.newHashMap()); } // list namespaces under root - String[][] lv1nsResult = catalog.listNamespaces(); + String[][] lv1nsResult = namespaceCatalog.listNamespaces(); assertThat(lv1nsResult.length).isEqualTo(lv1ns.length); for (String[] namespace : lv1ns) { assertThat(Arrays.asList(lv1nsResult)).contains(namespace); } // list namespace under l1ns1 - String[][] lv2ns1Result = catalog.listNamespaces(lv1ns[0]); + String[][] lv2ns1Result = namespaceCatalog.listNamespaces(lv1ns[0]); assertThat(lv2ns1Result.length).isEqualTo(lv2ns1.length); for (String[] namespace : lv2ns1) { assertThat(Arrays.asList(lv2ns1Result)).contains(namespace); } // list namespace under l1ns2 - String[][] lv2ns2Result = catalog.listNamespaces(lv1ns[1]); + String[][] lv2ns2Result = namespaceCatalog.listNamespaces(lv1ns[1]); assertThat(lv2ns2Result.length).isEqualTo(lv2ns2.length); for (String[] namespace : lv2ns2) { assertThat(Arrays.asList(lv2ns2Result)).contains(namespace); } // no namespace under l1ns2.l2ns3 - assertThat(catalog.listNamespaces(lv2ns2[0]).length).isEqualTo(0); + assertThat(namespaceCatalog.listNamespaces(lv2ns2[0]).length).isEqualTo(0); // drop the nested namespace under lv1ns[1] - catalog.dropNamespace(lv2ns2[0], true); - assertThat(catalog.listNamespaces(lv1ns[1]).length).isEqualTo(0); - catalog.dropNamespace(lv1ns[1], true); - assertThatThrownBy(() -> catalog.listNamespaces(lv1ns[1])) + namespaceCatalog.dropNamespace(lv2ns2[0], true); + assertThat(namespaceCatalog.listNamespaces(lv1ns[1]).length).isEqualTo(0); + namespaceCatalog.dropNamespace(lv1ns[1], true); + assertThatThrownBy(() -> namespaceCatalog.listNamespaces(lv1ns[1])) .isInstanceOf(NoSuchNamespaceException.class); // directly drop lv1ns[0] should fail - assertThatThrownBy(() -> catalog.dropNamespace(lv1ns[0], true)) + assertThatThrownBy(() -> namespaceCatalog.dropNamespace(lv1ns[0], true)) .isInstanceOf(BadRequestException.class); for (String[] namespace : lv2ns1) { - catalog.dropNamespace(namespace, true); + namespaceCatalog.dropNamespace(namespace, true); } - catalog.dropNamespace(lv1ns[0], true); + namespaceCatalog.dropNamespace(lv1ns[0], true); // no more namespace available - assertThat(catalog.listNamespaces().length).isEqualTo(0); + assertThat(namespaceCatalog.listNamespaces().length).isEqualTo(0); } @Test void testAlterNamespace() throws Exception { - SparkCatalog catalog = loadSparkCatalog(); String[] namespace = new String[] {"ns1"}; Map metadata = Maps.newHashMap(); metadata.put("owner", "user1"); - catalog.createNamespace(namespace, metadata); - assertThat(catalog.loadNamespaceMetadata(namespace)).contains(Map.entry("owner", "user1")); + namespaceCatalog.createNamespace(namespace, metadata); + assertThat(namespaceCatalog.loadNamespaceMetadata(namespace)) + .contains(Map.entry("owner", "user1")); - catalog.alterNamespace(namespace, NamespaceChange.setProperty("owner", "new-user")); - assertThat(catalog.loadNamespaceMetadata(namespace)).contains(Map.entry("owner", "new-user")); + namespaceCatalog.alterNamespace(namespace, NamespaceChange.setProperty("owner", "new-user")); + assertThat(namespaceCatalog.loadNamespaceMetadata(namespace)) + .contains(Map.entry("owner", "new-user")); // drop the namespace - catalog.dropNamespace(namespace, true); + namespaceCatalog.dropNamespace(namespace, true); } @Test void testBasicViewOperations() throws Exception { - SparkCatalog catalog = loadSparkCatalog(); String[] namespace = new String[] {"ns"}; - catalog.createNamespace(namespace, Maps.newHashMap()); + namespaceCatalog.createNamespace(namespace, Maps.newHashMap()); Identifier viewIdentifier = Identifier.of(namespace, "test-view"); String viewSql = "select id from test-table where id < 3"; - catalog.createView( + viewCatalog.createView( viewIdentifier, viewSql, catalogName, @@ -142,21 +154,21 @@ void testBasicViewOperations() throws Exception { Maps.newHashMap()); // load the view - View view = catalog.loadView(viewIdentifier); + View view = viewCatalog.loadView(viewIdentifier); assertThat(view.query()).isEqualTo(viewSql); assertThat(view.schema()).isEqualTo(schema); // alter the view properties - catalog.alterView(viewIdentifier, ViewChange.setProperty("owner", "user1")); - view = catalog.loadView(viewIdentifier); + viewCatalog.alterView(viewIdentifier, ViewChange.setProperty("owner", "user1")); + view = viewCatalog.loadView(viewIdentifier); assertThat(view.properties()).contains(Map.entry("owner", "user1")); // rename the view Identifier newIdentifier = Identifier.of(namespace, "new-view"); - catalog.renameView(viewIdentifier, newIdentifier); - assertThatThrownBy(() -> catalog.loadView(viewIdentifier)) + viewCatalog.renameView(viewIdentifier, newIdentifier); + assertThatThrownBy(() -> viewCatalog.loadView(viewIdentifier)) .isInstanceOf(NoSuchViewException.class); - view = catalog.loadView(newIdentifier); + view = viewCatalog.loadView(newIdentifier); assertThat(view.query()).isEqualTo(viewSql); assertThat(view.schema()).isEqualTo(schema); @@ -164,7 +176,7 @@ void testBasicViewOperations() throws Exception { String newSql = "select id from test-table where id == 3"; Map properties = Maps.newHashMap(); properties.put("owner", "test-user"); - catalog.replaceView( + replaceViewCatalog.replaceView( newIdentifier, newSql, catalogName, @@ -174,30 +186,28 @@ void testBasicViewOperations() throws Exception { new String[0], new String[0], properties); - view = catalog.loadView(newIdentifier); + view = viewCatalog.loadView(newIdentifier); assertThat(view.query()).isEqualTo(newSql); assertThat(view.properties()).contains(Map.entry("owner", "test-user")); // drop the view - catalog.dropView(newIdentifier); - assertThatThrownBy(() -> catalog.loadView(newIdentifier)) + viewCatalog.dropView(newIdentifier); + assertThatThrownBy(() -> viewCatalog.loadView(newIdentifier)) .isInstanceOf(NoSuchViewException.class); } @Test void testListViews() throws Exception { - SparkCatalog catalog = loadSparkCatalog(); - String[] l1ns = new String[] {"ns"}; - catalog.createNamespace(l1ns, Maps.newHashMap()); + namespaceCatalog.createNamespace(l1ns, Maps.newHashMap()); // create a new namespace under the default NS String[] l2ns = new String[] {"ns", "nsl2"}; - catalog.createNamespace(l2ns, Maps.newHashMap()); + namespaceCatalog.createNamespace(l2ns, Maps.newHashMap()); // create one view under l1 String view1Name = "test-view1"; String view1SQL = "select id from test-table where id >= 3"; - catalog.createView( + viewCatalog.createView( Identifier.of(l1ns, view1Name), view1SQL, catalogName, @@ -214,7 +224,7 @@ void testListViews() throws Exception { "select id from test-table where id == 3", "select id from test-table where id < 3" }; for (int i = 0; i < nsl2ViewNames.length; i++) { - catalog.createView( + viewCatalog.createView( Identifier.of(l2ns, nsl2ViewNames[i]), nsl2ViewSQLs[i], catalogName, @@ -226,26 +236,26 @@ void testListViews() throws Exception { Maps.newHashMap()); } // list views under l1ns - Identifier[] l1Views = catalog.listViews(l1ns); + Identifier[] l1Views = viewCatalog.listViews(l1ns); assertThat(l1Views.length).isEqualTo(1); assertThat(l1Views[0].name()).isEqualTo(view1Name); // list views under l2ns - Identifier[] l2Views = catalog.listViews(l2ns); + Identifier[] l2Views = viewCatalog.listViews(l2ns); assertThat(l2Views.length).isEqualTo(nsl2ViewSQLs.length); for (String name : nsl2ViewNames) { assertThat(Arrays.asList(l2Views)).contains(Identifier.of(l2ns, name)); } // drop namespace fails since there are views under it - assertThatThrownBy(() -> catalog.dropNamespace(l2ns, true)) + assertThatThrownBy(() -> namespaceCatalog.dropNamespace(l2ns, true)) .isInstanceOf(BadRequestException.class); // drop the views for (String name : nsl2ViewNames) { - catalog.dropView(Identifier.of(l2ns, name)); + viewCatalog.dropView(Identifier.of(l2ns, name)); } - catalog.dropNamespace(l2ns, true); - catalog.dropView(Identifier.of(l1ns, view1Name)); - catalog.dropNamespace(l1ns, true); + namespaceCatalog.dropNamespace(l2ns, true); + viewCatalog.dropView(Identifier.of(l1ns, view1Name)); + namespaceCatalog.dropNamespace(l1ns, true); } } diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java new file mode 100644 index 0000000000..ba7aa98766 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import org.apache.spark.sql.SparkSession; + +/** + * This integration directly performs operations using the Polaris SparkCatalog instance, instead of + * going through Spark SQL interface. This provides a more direct testing capability against the + * Polaris SparkCatalog operations, some operations like listNamespaces under a namespace can not be + * triggered through a SQL interface directly with Spark. + */ +@QuarkusIntegrationTest +public class SparkCatalogIcebergIT extends SparkCatalogBaseIT { + /** Initialize the spark catalog to use the iceberg spark catalog. */ + @Override + protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { + return builder + .config( + String.format("spark.sql.catalog.%s", catalogName), + "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.warehouse.dir", warehouseDir.toString()) + .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") + .config( + String.format("spark.sql.catalog.%s.uri", catalogName), + endpoints.catalogApiEndpoint().toString()) + .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName) + .config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL") + .config( + String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId()) + .config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken) + .config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey") + .config( + String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") + .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java new file mode 100644 index 0000000000..948ffb4497 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +/** + * This integration directly performs operations using the Polaris SparkCatalog instance, instead of + * going through Spark SQL interface. This provides a more direct testing capability against the + * Polaris SparkCatalog operations, some operations like listNamespaces under a namespace can not be + * triggered through a SQL interface directly with Spark. + */ +@QuarkusIntegrationTest +public class SparkCatalogPolarisIT extends SparkCatalogBaseIT {} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java similarity index 98% rename from plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java rename to plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java index d5bf98f4ee..f9af2609d0 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIcebergIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test; @QuarkusIntegrationTest -public class SparkIcebergIT extends SparkIntegrationBase { +public class SparkIT extends SparkIntegrationBase { @Test public void testNamespaces() { List namespaces = sql("SHOW NAMESPACES"); diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java index 581325f515..b5006d6a79 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java @@ -18,17 +18,14 @@ */ package org.apache.polaris.spark.quarkus.it; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.FormatMethod; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase; -import org.apache.polaris.spark.SparkCatalog; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.connector.catalog.CatalogPlugin; public abstract class SparkIntegrationBase extends PolarisSparkIntegrationTestBase { @@ -111,13 +108,4 @@ private Object[] toJava(Row row) { }) .toArray(Object[]::new); } - - protected SparkCatalog loadSparkCatalog() { - Preconditions.checkArgument(spark != null, "No active spark found"); - Preconditions.checkArgument(catalogName != null, "No catalogName found"); - CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName); - Preconditions.checkArgument( - catalogPlugin instanceof SparkCatalog, "The Spark Catalog is not a Polaris SparkCatalog"); - return (SparkCatalog) catalogPlugin; - } } From 2f3a2b1c50ab6e345f9be0d9d8918b46fb83d825 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Mon, 14 Apr 2025 18:03:48 -0700 Subject: [PATCH 7/7] clean up comments --- .../apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java | 6 +++--- .../polaris/spark/quarkus/it/SparkCatalogIcebergIT.java | 6 ------ .../polaris/spark/quarkus/it/SparkCatalogPolarisIT.java | 6 ------ 3 files changed, 3 insertions(+), 15 deletions(-) diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java index 0b6ae70543..575436e33b 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java @@ -36,9 +36,9 @@ import org.junit.jupiter.api.Test; /** - * This integration directly performs operations using the Polaris SparkCatalog instance, instead of - * going through Spark SQL interface. This provides a more direct testing capability against the - * Polaris SparkCatalog operations, some operations like listNamespaces under a namespace can not be + * This integration directly performs operations using the SparkCatalog instance, instead of going + * through Spark SQL interface. This provides a more direct testing capability against the Polaris + * SparkCatalog operations, some operations like listNamespaces under a namespace can not be * triggered through a SQL interface directly with Spark. */ @QuarkusIntegrationTest diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java index ba7aa98766..f3c411df23 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java @@ -21,12 +21,6 @@ import io.quarkus.test.junit.QuarkusIntegrationTest; import org.apache.spark.sql.SparkSession; -/** - * This integration directly performs operations using the Polaris SparkCatalog instance, instead of - * going through Spark SQL interface. This provides a more direct testing capability against the - * Polaris SparkCatalog operations, some operations like listNamespaces under a namespace can not be - * triggered through a SQL interface directly with Spark. - */ @QuarkusIntegrationTest public class SparkCatalogIcebergIT extends SparkCatalogBaseIT { /** Initialize the spark catalog to use the iceberg spark catalog. */ diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java index 948ffb4497..97a4c222db 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java @@ -20,11 +20,5 @@ import io.quarkus.test.junit.QuarkusIntegrationTest; -/** - * This integration directly performs operations using the Polaris SparkCatalog instance, instead of - * going through Spark SQL interface. This provides a more direct testing capability against the - * Polaris SparkCatalog operations, some operations like listNamespaces under a namespace can not be - * triggered through a SQL interface directly with Spark. - */ @QuarkusIntegrationTest public class SparkCatalogPolarisIT extends SparkCatalogBaseIT {}