diff --git a/build-logic/src/main/kotlin/Utilities.kt b/build-logic/src/main/kotlin/Utilities.kt new file mode 100644 index 0000000000..6c235bf398 --- /dev/null +++ b/build-logic/src/main/kotlin/Utilities.kt @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.gradle.api.Project +import org.gradle.process.JavaForkOptions + +/** + * Extract the scala version from polaris spark project, and points the build directory to a sub-dir + * that uses scala version as name. The polaris spark project name is in format of + * -_, for example: polaris-spark-3.5_2.12. + */ +fun Project.getAndUseScalaVersionForProject(): String { + val sparkScala = project.name.split("-").last().split("_") + + val scalaVersion = sparkScala[1] + + // direct the build to build/ to avoid potential collision problem + project.layout.buildDirectory.set(layout.buildDirectory.dir(scalaVersion).get()) + + return scalaVersion +} + +/** + * Adds the JPMS options required for Spark to run on Java 17, taken from the + * `DEFAULT_MODULE_OPTIONS` constant in `org.apache.spark.launcher.JavaModuleOptions`. + */ +fun JavaForkOptions.addSparkJvmOptions() { + jvmArgs = + (jvmArgs ?: emptyList()) + + listOf( + // Spark 3.3+ + "-XX:+IgnoreUnrecognizedVMOptions", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", + "--add-opens=java.base/java.io=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", + "--add-opens=java.base/sun.security.action=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", + "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED", + // Spark 3.4+ + "-Djdk.reflect.useDirectMethodHandle=false", + ) +} diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java new file mode 100644 index 0000000000..670d39a7da --- /dev/null +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/ext/PolarisSparkIntegrationTestBase.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.it.ext; + +import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; + +import com.adobe.testing.s3mock.testcontainers.S3MockContainer; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.admin.model.CatalogProperties; +import org.apache.polaris.core.admin.model.ExternalCatalog; +import org.apache.polaris.core.admin.model.PolarisCatalog; +import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.service.it.env.CatalogApi; +import org.apache.polaris.service.it.env.ClientCredentials; +import org.apache.polaris.service.it.env.IntegrationTestsHelper; +import org.apache.polaris.service.it.env.ManagementApi; +import org.apache.polaris.service.it.env.PolarisApiEndpoints; +import org.apache.polaris.service.it.env.PolarisClient; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.LoggerFactory; + +@ExtendWith(PolarisIntegrationTestExtension.class) +public abstract class PolarisSparkIntegrationTestBase { + protected static final S3MockContainer s3Container = + new S3MockContainer("3.11.0").withInitialBuckets("my-bucket,my-old-bucket"); + protected static SparkSession spark; + protected PolarisApiEndpoints endpoints; + protected PolarisClient client; + protected ManagementApi managementApi; + protected CatalogApi catalogApi; + protected String sparkToken; + protected String catalogName; + protected String externalCatalogName; + + protected URI warehouseDir; + + @BeforeAll + public static void setup() throws IOException { + s3Container.start(); + } + + @AfterAll + public static void cleanup() { + s3Container.stop(); + } + + @BeforeEach + public void before( + PolarisApiEndpoints apiEndpoints, ClientCredentials credentials, @TempDir Path tempDir) { + endpoints = apiEndpoints; + client = polarisClient(endpoints); + sparkToken = client.obtainToken(credentials); + managementApi = client.managementApi(credentials); + catalogApi = client.catalogApi(credentials); + + warehouseDir = IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve("spark-warehouse"); + + catalogName = client.newEntityName("spark_catalog"); + externalCatalogName = client.newEntityName("spark_ext_catalog"); + + AwsStorageConfigInfo awsConfigModel = + AwsStorageConfigInfo.builder() + .setRoleArn("arn:aws:iam::123456789012:role/my-role") + .setExternalId("externalId") + .setUserArn("userArn") + .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) + .setAllowedLocations(List.of("s3://my-old-bucket/path/to/data")) + .build(); + CatalogProperties props = new CatalogProperties("s3://my-bucket/path/to/data"); + props.putAll( + Map.of( + "table-default.s3.endpoint", + s3Container.getHttpEndpoint(), + "table-default.s3.path-style-access", + "true", + "table-default.s3.access-key-id", + "foo", + "table-default.s3.secret-access-key", + "bar", + "s3.endpoint", + s3Container.getHttpEndpoint(), + "s3.path-style-access", + "true", + "s3.access-key-id", + "foo", + "s3.secret-access-key", + "bar")); + Catalog catalog = + PolarisCatalog.builder() + .setType(Catalog.TypeEnum.INTERNAL) + .setName(catalogName) + .setProperties(props) + .setStorageConfigInfo(awsConfigModel) + .build(); + + managementApi.createCatalog(catalog); + + CatalogProperties externalProps = new CatalogProperties("s3://my-bucket/path/to/data"); + externalProps.putAll( + Map.of( + "table-default.s3.endpoint", + s3Container.getHttpEndpoint(), + "table-default.s3.path-style-access", + "true", + "table-default.s3.access-key-id", + "foo", + "table-default.s3.secret-access-key", + "bar", + "s3.endpoint", + s3Container.getHttpEndpoint(), + "s3.path-style-access", + "true", + "s3.access-key-id", + "foo", + "s3.secret-access-key", + "bar")); + Catalog externalCatalog = + ExternalCatalog.builder() + .setType(Catalog.TypeEnum.EXTERNAL) + .setName(externalCatalogName) + .setProperties(externalProps) + .setStorageConfigInfo(awsConfigModel) + .build(); + + managementApi.createCatalog(externalCatalog); + + SparkSession.Builder sessionBuilder = + SparkSession.builder() + .master("local[1]") + .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config( + "spark.hadoop.fs.s3.aws.credentials.provider", + "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider") + .config("spark.hadoop.fs.s3.access.key", "foo") + .config("spark.hadoop.fs.s3.secret.key", "bar") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.ui.showConsoleProgress", false) + .config("spark.ui.enabled", "false"); + spark = + withCatalog(withCatalog(sessionBuilder, catalogName), externalCatalogName).getOrCreate(); + + onSpark("USE " + catalogName); + } + + protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { + return builder + .config( + String.format("spark.sql.catalog.%s", catalogName), + "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.warehouse.dir", warehouseDir.toString()) + .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") + .config( + String.format("spark.sql.catalog.%s.uri", catalogName), + endpoints.catalogApiEndpoint().toString()) + .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName) + .config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL") + .config( + String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId()) + .config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken) + .config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey") + .config( + String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") + .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); + } + + @AfterEach + public void after() throws Exception { + cleanupCatalog(catalogName); + cleanupCatalog(externalCatalogName); + try { + SparkSession.clearDefaultSession(); + SparkSession.clearActiveSession(); + spark.close(); + } catch (Exception e) { + LoggerFactory.getLogger(getClass()).error("Unable to close spark session", e); + } + + client.close(); + } + + protected void cleanupCatalog(String catalogName) { + onSpark("USE " + catalogName); + List namespaces = onSpark("SHOW NAMESPACES").collectAsList(); + for (Row namespace : namespaces) { + List tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList(); + for (Row table : tables) { + onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1)); + } + List views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList(); + for (Row view : views) { + onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1)); + } + onSpark("DROP NAMESPACE " + namespace.getString(0)); + } + + managementApi.deleteCatalog(catalogName); + } + + protected static Dataset onSpark(@Language("SQL") String sql) { + return spark.sql(sql); + } +} diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java index 10868c1ea9..2bdd109281 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisSparkIntegrationTest.java @@ -18,47 +18,20 @@ */ package org.apache.polaris.service.it.test; -import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import com.adobe.testing.s3mock.testcontainers.S3MockContainer; import com.google.common.collect.ImmutableMap; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.Response; -import java.io.IOException; -import java.net.URI; -import java.nio.file.Path; import java.time.Instant; import java.util.List; import java.util.Map; import org.apache.iceberg.rest.requests.ImmutableRegisterTableRequest; import org.apache.iceberg.rest.responses.LoadTableResponse; -import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; -import org.apache.polaris.core.admin.model.Catalog; -import org.apache.polaris.core.admin.model.CatalogProperties; -import org.apache.polaris.core.admin.model.ExternalCatalog; -import org.apache.polaris.core.admin.model.PolarisCatalog; -import org.apache.polaris.core.admin.model.StorageConfigInfo; -import org.apache.polaris.service.it.env.CatalogApi; -import org.apache.polaris.service.it.env.ClientCredentials; -import org.apache.polaris.service.it.env.IntegrationTestsHelper; -import org.apache.polaris.service.it.env.ManagementApi; -import org.apache.polaris.service.it.env.PolarisApiEndpoints; -import org.apache.polaris.service.it.env.PolarisClient; -import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension; -import org.apache.spark.sql.Dataset; +import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.intellij.lang.annotations.Language; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; -import org.slf4j.LoggerFactory; /** * @implSpec This test expects the server to be configured with the following features enabled: @@ -71,185 +44,7 @@ * {@code true} * */ -@ExtendWith(PolarisIntegrationTestExtension.class) -public class PolarisSparkIntegrationTest { - - private static final S3MockContainer s3Container = - new S3MockContainer("3.11.0").withInitialBuckets("my-bucket,my-old-bucket"); - private static SparkSession spark; - private PolarisApiEndpoints endpoints; - private PolarisClient client; - private ManagementApi managementApi; - private CatalogApi catalogApi; - private String sparkToken; - private String catalogName; - private String externalCatalogName; - - private URI warehouseDir; - - @BeforeAll - public static void setup() throws IOException { - s3Container.start(); - } - - @AfterAll - public static void cleanup() { - s3Container.stop(); - } - - @BeforeEach - public void before( - PolarisApiEndpoints apiEndpoints, ClientCredentials credentials, @TempDir Path tempDir) { - endpoints = apiEndpoints; - client = polarisClient(endpoints); - sparkToken = client.obtainToken(credentials); - managementApi = client.managementApi(credentials); - catalogApi = client.catalogApi(credentials); - - warehouseDir = IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve("spark-warehouse"); - - catalogName = client.newEntityName("spark_catalog"); - externalCatalogName = client.newEntityName("spark_ext_catalog"); - - AwsStorageConfigInfo awsConfigModel = - AwsStorageConfigInfo.builder() - .setRoleArn("arn:aws:iam::123456789012:role/my-role") - .setExternalId("externalId") - .setUserArn("userArn") - .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) - .setAllowedLocations(List.of("s3://my-old-bucket/path/to/data")) - .build(); - CatalogProperties props = new CatalogProperties("s3://my-bucket/path/to/data"); - props.putAll( - Map.of( - "table-default.s3.endpoint", - s3Container.getHttpEndpoint(), - "table-default.s3.path-style-access", - "true", - "table-default.s3.access-key-id", - "foo", - "table-default.s3.secret-access-key", - "bar", - "s3.endpoint", - s3Container.getHttpEndpoint(), - "s3.path-style-access", - "true", - "s3.access-key-id", - "foo", - "s3.secret-access-key", - "bar")); - Catalog catalog = - PolarisCatalog.builder() - .setType(Catalog.TypeEnum.INTERNAL) - .setName(catalogName) - .setProperties(props) - .setStorageConfigInfo(awsConfigModel) - .build(); - - managementApi.createCatalog(catalog); - - CatalogProperties externalProps = new CatalogProperties("s3://my-bucket/path/to/data"); - externalProps.putAll( - Map.of( - "table-default.s3.endpoint", - s3Container.getHttpEndpoint(), - "table-default.s3.path-style-access", - "true", - "table-default.s3.access-key-id", - "foo", - "table-default.s3.secret-access-key", - "bar", - "s3.endpoint", - s3Container.getHttpEndpoint(), - "s3.path-style-access", - "true", - "s3.access-key-id", - "foo", - "s3.secret-access-key", - "bar")); - Catalog externalCatalog = - ExternalCatalog.builder() - .setType(Catalog.TypeEnum.EXTERNAL) - .setName(externalCatalogName) - .setProperties(externalProps) - .setStorageConfigInfo(awsConfigModel) - .build(); - - managementApi.createCatalog(externalCatalog); - - SparkSession.Builder sessionBuilder = - SparkSession.builder() - .master("local[1]") - .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - .config( - "spark.hadoop.fs.s3.aws.credentials.provider", - "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider") - .config("spark.hadoop.fs.s3.access.key", "foo") - .config("spark.hadoop.fs.s3.secret.key", "bar") - .config( - "spark.sql.extensions", - "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.ui.showConsoleProgress", false) - .config("spark.ui.enabled", "false"); - spark = - withCatalog(withCatalog(sessionBuilder, catalogName), externalCatalogName).getOrCreate(); - - onSpark("USE " + catalogName); - } - - private SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { - return builder - .config( - String.format("spark.sql.catalog.%s", catalogName), - "org.apache.iceberg.spark.SparkCatalog") - .config("spark.sql.warehouse.dir", warehouseDir.toString()) - .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") - .config( - String.format("spark.sql.catalog.%s.uri", catalogName), - endpoints.catalogApiEndpoint().toString()) - .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName) - .config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL") - .config( - String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId()) - .config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken) - .config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey") - .config( - String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") - .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); - } - - @AfterEach - public void after() throws Exception { - cleanupCatalog(catalogName); - cleanupCatalog(externalCatalogName); - try { - SparkSession.clearDefaultSession(); - SparkSession.clearActiveSession(); - spark.close(); - } catch (Exception e) { - LoggerFactory.getLogger(getClass()).error("Unable to close spark session", e); - } - - client.close(); - } - - private void cleanupCatalog(String catalogName) { - onSpark("USE " + catalogName); - List namespaces = onSpark("SHOW NAMESPACES").collectAsList(); - for (Row namespace : namespaces) { - List tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList(); - for (Row table : tables) { - onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1)); - } - List views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList(); - for (Row view : views) { - onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1)); - } - onSpark("DROP NAMESPACE " + namespace.getString(0)); - } - - managementApi.deleteCatalog(catalogName); - } +public class PolarisSparkIntegrationTest extends PolarisSparkIntegrationTestBase { @Test public void testCreateTable() { @@ -363,8 +158,4 @@ private LoadTableResponse loadTable(String catalog, String namespace, String tab return response.readEntity(LoadTableResponse.class); } } - - private static Dataset onSpark(@Language("SQL") String sql) { - return spark.sql(sql); - } } diff --git a/plugins/pluginlibs.versions.toml b/plugins/pluginlibs.versions.toml index e48f6ef45a..4f7288ff64 100644 --- a/plugins/pluginlibs.versions.toml +++ b/plugins/pluginlibs.versions.toml @@ -22,4 +22,3 @@ iceberg = "1.8.1" spark35 = "3.5.5" scala212 = "2.12.19" scala213 = "2.13.15" - diff --git a/plugins/spark/v3.5/integration/build.gradle.kts b/plugins/spark/v3.5/integration/build.gradle.kts new file mode 100644 index 0000000000..e4611feb37 --- /dev/null +++ b/plugins/spark/v3.5/integration/build.gradle.kts @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + alias(libs.plugins.quarkus) + alias(libs.plugins.jandex) + id("polaris-quarkus") +} + +// get version information +val sparkMajorVersion = "3.5" +val scalaVersion = getAndUseScalaVersionForProject() +val icebergVersion = pluginlibs.versions.iceberg.get() +val spark35Version = pluginlibs.versions.spark35.get() +val scalaLibraryVersion = + if (scalaVersion == "2.12") { + pluginlibs.versions.scala212.get() + } else { + pluginlibs.versions.scala213.get() + } + +dependencies { + // must be enforced to get a consistent and validated set of dependencies + implementation(enforcedPlatform(libs.quarkus.bom)) { + exclude(group = "org.antlr", module = "antlr4-runtime") + exclude(group = "org.scala-lang", module = "scala-library") + exclude(group = "org.scala-lang", module = "scala-reflect") + } + + implementation(project(":polaris-quarkus-service")) + implementation(project(":polaris-api-management-model")) + implementation(project(":polaris-spark-${sparkMajorVersion}_${scalaVersion}")) + + implementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") { + // exclude log4j dependencies + exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") + exclude("org.apache.logging.log4j", "log4j-api") + exclude("org.apache.logging.log4j", "log4j-1.2-api") + exclude("org.slf4j", "jul-to-slf4j") + } + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-annotations") + implementation("com.fasterxml.jackson.core:jackson-core") + implementation("com.fasterxml.jackson.core:jackson-databind") + + implementation( + "org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}" + ) + + testImplementation(testFixtures(project(":polaris-quarkus-service"))) + + testImplementation(platform(libs.quarkus.bom)) + testImplementation("io.quarkus:quarkus-junit5") + testImplementation("io.quarkus:quarkus-rest-client") + testImplementation("io.quarkus:quarkus-rest-client-jackson") + + testImplementation(platform(libs.awssdk.bom)) + testImplementation("software.amazon.awssdk:glue") + testImplementation("software.amazon.awssdk:kms") + testImplementation("software.amazon.awssdk:dynamodb") + + testImplementation(platform(libs.testcontainers.bom)) + testImplementation("org.testcontainers:testcontainers") + testImplementation(libs.s3mock.testcontainers) + + // Required for Spark integration tests + testImplementation(enforcedPlatform("org.scala-lang:scala-library:${scalaLibraryVersion}")) + testImplementation(enforcedPlatform("org.scala-lang:scala-reflect:${scalaLibraryVersion}")) + testImplementation(libs.javax.servlet.api) + testImplementation(libs.antlr4.runtime) +} + +tasks.named("intTest").configure { + maxParallelForks = 1 + systemProperty("java.util.logging.manager", "org.jboss.logmanager.LogManager") + if (System.getenv("AWS_REGION") == null) { + environment("AWS_REGION", "us-west-2") + } + // Note: the test secrets are referenced in + // org.apache.polaris.service.quarkus.it.QuarkusServerManager + environment("POLARIS_BOOTSTRAP_CREDENTIALS", "POLARIS,test-admin,test-secret") + jvmArgs("--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED") + // Need to allow a java security manager after Java 21, for Subject.getSubject to work + // "getSubject is supported only if a security manager is allowed". + systemProperty("java.security.manager", "allow") + // Same issue as above: allow a java security manager after Java 21 + // (this setting is for the application under test, while the setting above is for test code). + systemProperty("quarkus.test.arg-line", "-Djava.security.manager=allow") + val logsDir = project.layout.buildDirectory.get().asFile.resolve("logs") + // delete files from previous runs + doFirst { + // delete log files written by Polaris + logsDir.deleteRecursively() + // delete quarkus.log file (captured Polaris stdout/stderr) + project.layout.buildDirectory.get().asFile.resolve("quarkus.log").delete() + } + // This property is not honored in a per-profile application.properties file, + // so we need to set it here. + systemProperty("quarkus.log.file.path", logsDir.resolve("polaris.log").absolutePath) + // For Spark integration tests + addSparkJvmOptions() +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java new file mode 100644 index 0000000000..575436e33b --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogBaseIT.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.util.Arrays; +import java.util.Map; +import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.spark.SupportsReplaceView; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.connector.catalog.*; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * This integration directly performs operations using the SparkCatalog instance, instead of going + * through Spark SQL interface. This provides a more direct testing capability against the Polaris + * SparkCatalog operations, some operations like listNamespaces under a namespace can not be + * triggered through a SQL interface directly with Spark. + */ +@QuarkusIntegrationTest +public abstract class SparkCatalogBaseIT extends SparkIntegrationBase { + private static StructType schema = new StructType().add("id", "long").add("name", "string"); + protected StagingTableCatalog tableCatalog = null; + protected SupportsNamespaces namespaceCatalog = null; + protected ViewCatalog viewCatalog = null; + protected SupportsReplaceView replaceViewCatalog = null; + + @BeforeEach + protected void loadCatalogs() { + Preconditions.checkArgument(spark != null, "No active spark found"); + Preconditions.checkArgument(catalogName != null, "No catalogName found"); + CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName); + tableCatalog = (StagingTableCatalog) catalogPlugin; + namespaceCatalog = (SupportsNamespaces) catalogPlugin; + viewCatalog = (ViewCatalog) catalogPlugin; + replaceViewCatalog = (SupportsReplaceView) catalogPlugin; + } + + @Test + void testNamespaceOperations() throws Exception { + String[][] lv1ns = new String[][] {{"l1ns1"}, {"l1ns2"}}; + String[][] lv2ns1 = new String[][] {{"l1ns1", "l2ns1"}, {"l1ns1", "l2ns2"}}; + String[][] lv2ns2 = new String[][] {{"l1ns2", "l2ns3"}}; + + // create the namespaces + for (String[] namespace : lv1ns) { + namespaceCatalog.createNamespace(namespace, Maps.newHashMap()); + } + for (String[] namespace : lv2ns1) { + namespaceCatalog.createNamespace(namespace, Maps.newHashMap()); + } + for (String[] namespace : lv2ns2) { + namespaceCatalog.createNamespace(namespace, Maps.newHashMap()); + } + + // list namespaces under root + String[][] lv1nsResult = namespaceCatalog.listNamespaces(); + assertThat(lv1nsResult.length).isEqualTo(lv1ns.length); + for (String[] namespace : lv1ns) { + assertThat(Arrays.asList(lv1nsResult)).contains(namespace); + } + // list namespace under l1ns1 + String[][] lv2ns1Result = namespaceCatalog.listNamespaces(lv1ns[0]); + assertThat(lv2ns1Result.length).isEqualTo(lv2ns1.length); + for (String[] namespace : lv2ns1) { + assertThat(Arrays.asList(lv2ns1Result)).contains(namespace); + } + // list namespace under l1ns2 + String[][] lv2ns2Result = namespaceCatalog.listNamespaces(lv1ns[1]); + assertThat(lv2ns2Result.length).isEqualTo(lv2ns2.length); + for (String[] namespace : lv2ns2) { + assertThat(Arrays.asList(lv2ns2Result)).contains(namespace); + } + // no namespace under l1ns2.l2ns3 + assertThat(namespaceCatalog.listNamespaces(lv2ns2[0]).length).isEqualTo(0); + + // drop the nested namespace under lv1ns[1] + namespaceCatalog.dropNamespace(lv2ns2[0], true); + assertThat(namespaceCatalog.listNamespaces(lv1ns[1]).length).isEqualTo(0); + namespaceCatalog.dropNamespace(lv1ns[1], true); + assertThatThrownBy(() -> namespaceCatalog.listNamespaces(lv1ns[1])) + .isInstanceOf(NoSuchNamespaceException.class); + + // directly drop lv1ns[0] should fail + assertThatThrownBy(() -> namespaceCatalog.dropNamespace(lv1ns[0], true)) + .isInstanceOf(BadRequestException.class); + for (String[] namespace : lv2ns1) { + namespaceCatalog.dropNamespace(namespace, true); + } + namespaceCatalog.dropNamespace(lv1ns[0], true); + + // no more namespace available + assertThat(namespaceCatalog.listNamespaces().length).isEqualTo(0); + } + + @Test + void testAlterNamespace() throws Exception { + String[] namespace = new String[] {"ns1"}; + Map metadata = Maps.newHashMap(); + metadata.put("owner", "user1"); + + namespaceCatalog.createNamespace(namespace, metadata); + assertThat(namespaceCatalog.loadNamespaceMetadata(namespace)) + .contains(Map.entry("owner", "user1")); + + namespaceCatalog.alterNamespace(namespace, NamespaceChange.setProperty("owner", "new-user")); + assertThat(namespaceCatalog.loadNamespaceMetadata(namespace)) + .contains(Map.entry("owner", "new-user")); + + // drop the namespace + namespaceCatalog.dropNamespace(namespace, true); + } + + @Test + void testBasicViewOperations() throws Exception { + String[] namespace = new String[] {"ns"}; + namespaceCatalog.createNamespace(namespace, Maps.newHashMap()); + + Identifier viewIdentifier = Identifier.of(namespace, "test-view"); + String viewSql = "select id from test-table where id < 3"; + viewCatalog.createView( + viewIdentifier, + viewSql, + catalogName, + namespace, + schema, + new String[0], + new String[0], + new String[0], + Maps.newHashMap()); + + // load the view + View view = viewCatalog.loadView(viewIdentifier); + assertThat(view.query()).isEqualTo(viewSql); + assertThat(view.schema()).isEqualTo(schema); + + // alter the view properties + viewCatalog.alterView(viewIdentifier, ViewChange.setProperty("owner", "user1")); + view = viewCatalog.loadView(viewIdentifier); + assertThat(view.properties()).contains(Map.entry("owner", "user1")); + + // rename the view + Identifier newIdentifier = Identifier.of(namespace, "new-view"); + viewCatalog.renameView(viewIdentifier, newIdentifier); + assertThatThrownBy(() -> viewCatalog.loadView(viewIdentifier)) + .isInstanceOf(NoSuchViewException.class); + view = viewCatalog.loadView(newIdentifier); + assertThat(view.query()).isEqualTo(viewSql); + assertThat(view.schema()).isEqualTo(schema); + + // replace the view + String newSql = "select id from test-table where id == 3"; + Map properties = Maps.newHashMap(); + properties.put("owner", "test-user"); + replaceViewCatalog.replaceView( + newIdentifier, + newSql, + catalogName, + namespace, + schema, + new String[0], + new String[0], + new String[0], + properties); + view = viewCatalog.loadView(newIdentifier); + assertThat(view.query()).isEqualTo(newSql); + assertThat(view.properties()).contains(Map.entry("owner", "test-user")); + + // drop the view + viewCatalog.dropView(newIdentifier); + assertThatThrownBy(() -> viewCatalog.loadView(newIdentifier)) + .isInstanceOf(NoSuchViewException.class); + } + + @Test + void testListViews() throws Exception { + String[] l1ns = new String[] {"ns"}; + namespaceCatalog.createNamespace(l1ns, Maps.newHashMap()); + + // create a new namespace under the default NS + String[] l2ns = new String[] {"ns", "nsl2"}; + namespaceCatalog.createNamespace(l2ns, Maps.newHashMap()); + // create one view under l1 + String view1Name = "test-view1"; + String view1SQL = "select id from test-table where id >= 3"; + viewCatalog.createView( + Identifier.of(l1ns, view1Name), + view1SQL, + catalogName, + l1ns, + schema, + new String[0], + new String[0], + new String[0], + Maps.newHashMap()); + // create two views under the l2 namespace + String[] nsl2ViewNames = new String[] {"test-view2", "test-view3"}; + String[] nsl2ViewSQLs = + new String[] { + "select id from test-table where id == 3", "select id from test-table where id < 3" + }; + for (int i = 0; i < nsl2ViewNames.length; i++) { + viewCatalog.createView( + Identifier.of(l2ns, nsl2ViewNames[i]), + nsl2ViewSQLs[i], + catalogName, + l2ns, + schema, + new String[0], + new String[0], + new String[0], + Maps.newHashMap()); + } + // list views under l1ns + Identifier[] l1Views = viewCatalog.listViews(l1ns); + assertThat(l1Views.length).isEqualTo(1); + assertThat(l1Views[0].name()).isEqualTo(view1Name); + + // list views under l2ns + Identifier[] l2Views = viewCatalog.listViews(l2ns); + assertThat(l2Views.length).isEqualTo(nsl2ViewSQLs.length); + for (String name : nsl2ViewNames) { + assertThat(Arrays.asList(l2Views)).contains(Identifier.of(l2ns, name)); + } + + // drop namespace fails since there are views under it + assertThatThrownBy(() -> namespaceCatalog.dropNamespace(l2ns, true)) + .isInstanceOf(BadRequestException.class); + // drop the views + for (String name : nsl2ViewNames) { + viewCatalog.dropView(Identifier.of(l2ns, name)); + } + namespaceCatalog.dropNamespace(l2ns, true); + viewCatalog.dropView(Identifier.of(l1ns, view1Name)); + namespaceCatalog.dropNamespace(l1ns, true); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java new file mode 100644 index 0000000000..f3c411df23 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogIcebergIT.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import org.apache.spark.sql.SparkSession; + +@QuarkusIntegrationTest +public class SparkCatalogIcebergIT extends SparkCatalogBaseIT { + /** Initialize the spark catalog to use the iceberg spark catalog. */ + @Override + protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { + return builder + .config( + String.format("spark.sql.catalog.%s", catalogName), + "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.warehouse.dir", warehouseDir.toString()) + .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") + .config( + String.format("spark.sql.catalog.%s.uri", catalogName), + endpoints.catalogApiEndpoint().toString()) + .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName) + .config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL") + .config( + String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId()) + .config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken) + .config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey") + .config( + String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") + .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java new file mode 100644 index 0000000000..97a4c222db --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkCatalogPolarisIT.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class SparkCatalogPolarisIT extends SparkCatalogBaseIT {} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java new file mode 100644 index 0000000000..f9af2609d0 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.util.List; +import org.junit.jupiter.api.Test; + +@QuarkusIntegrationTest +public class SparkIT extends SparkIntegrationBase { + @Test + public void testNamespaces() { + List namespaces = sql("SHOW NAMESPACES"); + assertThat(namespaces.size()).isEqualTo(0); + + String[] l1NS = new String[] {"l1ns1", "l1ns2"}; + for (String ns : l1NS) { + sql("CREATE NAMESPACE %s", ns); + } + namespaces = sql("SHOW NAMESPACES"); + assertThat(namespaces.size()).isEqualTo(2); + for (String ns : l1NS) { + assertThat(namespaces).contains(new Object[] {ns}); + } + String l2ns = "l2ns"; + // create a nested namespace + sql("CREATE NAMESPACE %s.%s", l1NS[0], l2ns); + // spark show namespace only shows + namespaces = sql("SHOW NAMESPACES"); + assertThat(namespaces.size()).isEqualTo(2); + + // can not drop l1NS before the nested namespace is dropped + assertThatThrownBy(() -> sql("DROP NAMESPACE %s", l1NS[0])) + .hasMessageContaining(String.format("Namespace %s is not empty", l1NS[0])); + sql("DROP NAMESPACE %s.%s", l1NS[0], l2ns); + + for (String ns : l1NS) { + sql("DROP NAMESPACE %s", ns); + } + + // no namespace available after all drop + namespaces = sql("SHOW NAMESPACES"); + assertThat(namespaces.size()).isEqualTo(0); + } + + @Test + public void testCreatDropView() { + String namespace = "ns"; + // create namespace ns + sql("CREATE NAMESPACE %s", namespace); + sql("USE %s", namespace); + + // create two views under the namespace + String view1Name = "testView1"; + String view2Name = "testView2"; + sql("CREATE VIEW %s AS SELECT 1 AS id", view1Name); + sql("CREATE VIEW %s AS SELECT 10 AS id", view2Name); + List views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(2); + assertThat(views).contains(new Object[] {namespace, view1Name, false}); + assertThat(views).contains(new Object[] {namespace, view2Name, false}); + + // drop the views + sql("DROP VIEW %s", view1Name); + views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(1); + assertThat(views).contains(new Object[] {namespace, view2Name, false}); + + sql("DROP VIEW %s", view2Name); + views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(0); + } + + @Test + public void renameView() { + sql("CREATE NAMESPACE ns"); + sql("USE ns"); + + String viewName = "originalView"; + String renamedView = "renamedView"; + sql("CREATE VIEW %s AS SELECT 1 AS id", viewName); + List views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(1); + assertThat(views).contains(new Object[] {"ns", viewName, false}); + + sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); + views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(1); + assertThat(views).contains(new Object[] {"ns", renamedView, false}); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java new file mode 100644 index 0000000000..b5006d6a79 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import com.google.common.collect.ImmutableList; +import com.google.errorprone.annotations.FormatMethod; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +public abstract class SparkIntegrationBase extends PolarisSparkIntegrationTestBase { + + @Override + protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { + return builder + .config( + String.format("spark.sql.catalog.%s", catalogName), + "org.apache.polaris.spark.SparkCatalog") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.warehouse.dir", warehouseDir.toString()) + .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") + .config( + String.format("spark.sql.catalog.%s.uri", catalogName), + endpoints.catalogApiEndpoint().toString()) + .config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName) + .config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL") + .config( + String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId()) + .config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken) + .config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey") + .config( + String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret") + .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); + } + + @Override + protected void cleanupCatalog(String catalogName) { + onSpark("USE " + catalogName); + List namespaces = onSpark("SHOW NAMESPACES").collectAsList(); + for (Row namespace : namespaces) { + // TODO: once all table operations are supported, remove the override of this function + // List tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList(); + // for (Row table : tables) { + // onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1)); + // } + List views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList(); + for (Row view : views) { + onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1)); + } + onSpark("DROP NAMESPACE " + namespace.getString(0)); + } + + managementApi.deleteCatalog(catalogName); + } + + @FormatMethod + protected List sql(String query, Object... args) { + List rows = spark.sql(String.format(query, args)).collectAsList(); + if (rows.isEmpty()) { + return ImmutableList.of(); + } + return rowsToJava(rows); + } + + protected List rowsToJava(List rows) { + return rows.stream().map(this::toJava).collect(Collectors.toList()); + } + + private Object[] toJava(Row row) { + return IntStream.range(0, row.size()) + .mapToObj( + pos -> { + if (row.isNullAt(pos)) { + return null; + } + + Object value = row.get(pos); + if (value instanceof Row) { + return toJava((Row) value); + } else if (value instanceof scala.collection.Seq) { + return row.getList(pos); + } else if (value instanceof scala.collection.Map) { + return row.getJavaMap(pos); + } else { + return value; + } + }) + .toArray(Object[]::new); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager b/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager new file mode 100644 index 0000000000..3c3881857b --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/resources/META-INF/services/org.apache.polaris.service.it.ext.PolarisServerManager @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.polaris.service.quarkus.it.QuarkusServerManager diff --git a/plugins/spark/v3.5/build.gradle.kts b/plugins/spark/v3.5/spark/build.gradle.kts similarity index 94% rename from plugins/spark/v3.5/build.gradle.kts rename to plugins/spark/v3.5/spark/build.gradle.kts index df37fa229e..ddf27ce1f9 100644 --- a/plugins/spark/v3.5/build.gradle.kts +++ b/plugins/spark/v3.5/spark/build.gradle.kts @@ -24,17 +24,6 @@ plugins { alias(libs.plugins.jandex) } -fun getAndUseScalaVersionForProject(): String { - val sparkScala = project.name.split("-").last().split("_") - - val scalaVersion = sparkScala[1] - - // direct the build to build/ to avoid potential collision problem - project.layout.buildDirectory.set(layout.buildDirectory.dir(scalaVersion).get()) - - return scalaVersion -} - // get version information val sparkMajorVersion = "3.5" val scalaVersion = getAndUseScalaVersionForProject() @@ -141,6 +130,7 @@ tasks.register("createPolarisSparkJar") { mergeServiceFiles() // pack both the source code and dependencies + from(sourceSets.main.get().output) configurations = listOf(project.configurations.runtimeClasspath.get()) diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisCatalog.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java similarity index 100% rename from plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java rename to plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java rename to plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java similarity index 100% rename from plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java rename to plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java similarity index 100% rename from plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java rename to plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java similarity index 100% rename from plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java rename to plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java diff --git a/quarkus/spark-tests/build.gradle.kts b/quarkus/spark-tests/build.gradle.kts index 18cc9d585f..0e83a7f279 100644 --- a/quarkus/spark-tests/build.gradle.kts +++ b/quarkus/spark-tests/build.gradle.kts @@ -88,32 +88,3 @@ tasks.named("intTest").configure { // For Spark integration tests addSparkJvmOptions() } - -/** - * Adds the JPMS options required for Spark to run on Java 17, taken from the - * `DEFAULT_MODULE_OPTIONS` constant in `org.apache.spark.launcher.JavaModuleOptions`. - */ -fun JavaForkOptions.addSparkJvmOptions() { - jvmArgs = - (jvmArgs ?: emptyList()) + - listOf( - // Spark 3.3+ - "-XX:+IgnoreUnrecognizedVMOptions", - "--add-opens=java.base/java.lang=ALL-UNNAMED", - "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", - "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", - "--add-opens=java.base/java.io=ALL-UNNAMED", - "--add-opens=java.base/java.net=ALL-UNNAMED", - "--add-opens=java.base/java.nio=ALL-UNNAMED", - "--add-opens=java.base/java.util=ALL-UNNAMED", - "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", - "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", - "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", - "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", - "--add-opens=java.base/sun.security.action=ALL-UNNAMED", - "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", - "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED", - // Spark 3.4+ - "-Djdk.reflect.useDirectMethodHandle=false", - ) -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 3d658130d8..cbfbc3a269 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -71,12 +71,21 @@ for (sparkVersion in sparkVersions) { val scalaVersions = sparkScalaVersions["scalaVersions"].toString().split(",").map { it.trim() } var first = true for (scalaVersion in scalaVersions) { - val artifactId = "polaris-spark-${sparkVersion}_${scalaVersion}" - polarisProject(artifactId, file("${polarisSparkDir}/v${sparkVersion}")) + val sparkArtifactId = "polaris-spark-${sparkVersion}_${scalaVersion}" + val sparkIntArtifactId = "polaris-spark-integration-${sparkVersion}_${scalaVersion}" + polarisProject( + "polaris-spark-${sparkVersion}_${scalaVersion}", + file("${polarisSparkDir}/v${sparkVersion}/spark"), + ) + polarisProject( + "polaris-spark-integration-${sparkVersion}_${scalaVersion}", + file("${polarisSparkDir}/v${sparkVersion}/integration"), + ) if (first) { first = false } else { - noSourceChecksProjects.add(":$artifactId") + noSourceChecksProjects.add(":$sparkArtifactId") + noSourceChecksProjects.add(":$sparkIntArtifactId") } // Skip all duplicated spark client projects while using Intelij IDE. // This is to avoid problems during dependency analysis and sync when