diff --git a/plugins/pluginlibs.versions.toml b/plugins/pluginlibs.versions.toml new file mode 100644 index 0000000000..0a4a515e5c --- /dev/null +++ b/plugins/pluginlibs.versions.toml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +[versions] +iceberg = "1.8.1" +spark35 = "3.5.5" diff --git a/plugins/spark/README.md b/plugins/spark/README.md new file mode 100644 index 0000000000..6386a914c0 --- /dev/null +++ b/plugins/spark/README.md @@ -0,0 +1,40 @@ + + +# Polaris Spark Plugin + +The Polaris Spark plugin provides a SparkCatalog class, which communicates with the Polaris +REST endpoints, and provides implementations for Apache Spark's +[TableCatalog](https://github.com/apache/spark/blob/v3.5.5/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java), +[SupportsNamespaces](https://github.com/apache/spark/blob/v3.5.5/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java), +[ViewCatalog](https://github.com/apache/spark/blob/v3.5.5/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java) classes. + +Right now, the plugin only provides support for Spark 3.5, Scala version 2.12 and 2.13, +and depends on iceberg-spark-runtime 1.8.1. + +# Build Plugin Jar +A task createPolarisSparkJar is added to build a jar for the Polaris Spark plugin, the jar is named as: +"polaris-iceberg--spark-runtime-_.jar" + +Building the Polaris project produces client jars for both Scala 2.12 and 2.13, and CI runs the Spark +client tests for both Scala versions as well. + +The Jar can also be built alone with a specific version using target `:polaris-spark-3.5_`. For example: +- `./gradlew :polaris-spark-3.5_2.12:createPolarisSparkJar` - Build a jar for the Polaris Spark plugin with scala version 2.12. +The result jar is located at plugins/spark/build//libs after the build. diff --git a/plugins/spark/spark-scala.properties b/plugins/spark/spark-scala.properties new file mode 100644 index 0000000000..2ed71b574f --- /dev/null +++ b/plugins/spark/spark-scala.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sparkVersions=3.5 + +scalaVersions=2.12,2.13 diff --git a/plugins/spark/v3.5/build.gradle.kts b/plugins/spark/v3.5/build.gradle.kts new file mode 100644 index 0000000000..36ca6d5285 --- /dev/null +++ b/plugins/spark/v3.5/build.gradle.kts @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + +plugins { + id("polaris-client") + alias(libs.plugins.jandex) +} + +fun getAndUseScalaVersionForProject(): String { + val sparkScala = project.name.split("-").last().split("_") + + val scalaVersion = sparkScala[1] + + // direct the build to build/ to avoid potential collision problem + project.layout.buildDirectory.set(layout.buildDirectory.dir(scalaVersion).get()) + + return scalaVersion +} + +// get version information +val sparkMajorVersion = "3.5" +val scalaVersion = getAndUseScalaVersionForProject() +val icebergVersion = pluginlibs.versions.iceberg.get() +val spark35Version = pluginlibs.versions.spark35.get() + +dependencies { + implementation(project(":polaris-api-iceberg-service")) { + // exclude the iceberg and jackson dependencies, use the + // dependencies packed in the iceberg-spark dependency + exclude("org.apache.iceberg", "*") + exclude("com.fasterxml.jackson.core", "*") + } + + implementation( + "org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}" + ) + + compileOnly("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") { + // exclude log4j dependencies + exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") + exclude("org.apache.logging.log4j", "log4j-api") + exclude("org.apache.logging.log4j", "log4j-1.2-api") + exclude("org.slf4j", "jul-to-slf4j") + } + + testImplementation(platform(libs.junit.bom)) + testImplementation("org.junit.jupiter:junit-jupiter") + testImplementation(libs.assertj.core) + testImplementation(libs.mockito.core) + + testImplementation( + "org.apache.iceberg:iceberg-spark-runtime-3.5_${scalaVersion}:${icebergVersion}" + ) + testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") { + // exclude log4j dependencies + exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") + exclude("org.apache.logging.log4j", "log4j-api") + exclude("org.apache.logging.log4j", "log4j-1.2-api") + exclude("org.slf4j", "jul-to-slf4j") + } +} + +tasks.register("createPolarisSparkJar") { + archiveClassifier = null + archiveBaseName = + "polaris-iceberg-${icebergVersion}-spark-runtime-${sparkMajorVersion}_${scalaVersion}" + isZip64 = true + + dependencies { exclude("META-INF/**") } + + // pack both the source code and dependencies + from(sourceSets.main.get().output) + configurations = listOf(project.configurations.runtimeClasspath.get()) + + mergeServiceFiles() + + // Optimization: Minimize the JAR (remove unused classes from dependencies) + // The iceberg-spark-runtime plugin is always packaged along with our polaris-spark plugin, + // therefore excluded from the optimization. + minimize { exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) } +} + +tasks.withType(Jar::class).named("sourcesJar") { dependsOn("createPolarisSparkJar") } diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java new file mode 100644 index 0000000000..2ec0450a04 --- /dev/null +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark; + +import com.google.common.collect.ImmutableSet; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.*; +import org.apache.spark.sql.connector.catalog.*; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class SparkCatalog implements TableCatalog, SupportsNamespaces, ViewCatalog { + private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); + private String catalogName = null; + private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null; + + // TODO: Add Polaris Specific REST Catalog + + @Override + public String name() { + return catalogName; + } + + @Override + public void initialize(String name, CaseInsensitiveStringMap options) { + this.catalogName = name; + } + + @Override + public Table loadTable(Identifier ident) throws NoSuchTableException { + throw new UnsupportedOperationException("loadTable"); + } + + @Override + public Table createTable( + Identifier ident, StructType schema, Transform[] transforms, Map properties) + throws TableAlreadyExistsException { + throw new UnsupportedOperationException("createTable"); + } + + @Override + public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException { + throw new UnsupportedOperationException("alterTable"); + } + + @Override + public boolean dropTable(Identifier ident) { + throw new UnsupportedOperationException("dropTable"); + } + + @Override + public void renameTable(Identifier from, Identifier to) + throws NoSuchTableException, TableAlreadyExistsException { + throw new UnsupportedOperationException("renameTable"); + } + + @Override + public Identifier[] listTables(String[] namespace) { + throw new UnsupportedOperationException("listTables"); + } + + @Override + public String[] defaultNamespace() { + throw new UnsupportedOperationException("defaultNamespace"); + } + + @Override + public String[][] listNamespaces() { + throw new UnsupportedOperationException("listNamespaces"); + } + + @Override + public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException { + throw new UnsupportedOperationException("listNamespaces"); + } + + @Override + public Map loadNamespaceMetadata(String[] namespace) + throws NoSuchNamespaceException { + throw new UnsupportedOperationException("loadNamespaceMetadata"); + } + + @Override + public void createNamespace(String[] namespace, Map metadata) + throws NamespaceAlreadyExistsException { + throw new UnsupportedOperationException("createNamespace"); + } + + @Override + public void alterNamespace(String[] namespace, NamespaceChange... changes) + throws NoSuchNamespaceException { + throw new UnsupportedOperationException("alterNamespace"); + } + + @Override + public boolean dropNamespace(String[] namespace, boolean cascade) + throws NoSuchNamespaceException { + throw new UnsupportedOperationException("dropNamespace"); + } + + @Override + public Identifier[] listViews(String... namespace) { + throw new UnsupportedOperationException("listViews"); + } + + @Override + public View loadView(Identifier ident) throws NoSuchViewException { + throw new UnsupportedOperationException("loadView"); + } + + @Override + public View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] queryColumnNames, + String[] columnAliases, + String[] columnComments, + Map properties) + throws ViewAlreadyExistsException, NoSuchNamespaceException { + throw new UnsupportedOperationException("createView"); + } + + @Override + public View alterView(Identifier ident, ViewChange... changes) + throws NoSuchViewException, IllegalArgumentException { + throw new UnsupportedOperationException("alterView"); + } + + @Override + public boolean dropView(Identifier ident) { + throw new UnsupportedOperationException("dropView"); + } + + @Override + public void renameView(Identifier fromIdentifier, Identifier toIdentifier) + throws NoSuchViewException, ViewAlreadyExistsException { + throw new UnsupportedOperationException("renameView"); + } +} diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java new file mode 100644 index 0000000000..50c1e645a5 --- /dev/null +++ b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark; + +import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class SparkCatalogTest { + private SparkCatalog catalog; + private String catalogName; + + @BeforeEach + public void setup() { + catalogName = "test_" + UUID.randomUUID(); + Map catalogConfig = Maps.newHashMap(); + catalogConfig.put(CATALOG_IMPL, "org.apache.iceberg.inmemory.InMemoryCatalog"); + catalogConfig.put("cache-enabled", "false"); + catalog = new SparkCatalog(); + catalog.initialize(catalogName, new CaseInsensitiveStringMap(catalogConfig)); + } + + @Test + public void testUnsupportedOperations() { + String[] namespace = new String[] {"ns1"}; + Identifier identifier = Identifier.of(namespace, "table1"); + Identifier new_identifier = Identifier.of(namespace, "table2"); + // table methods + assertThatThrownBy(() -> catalog.loadTable(identifier)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy( + () -> catalog.createTable(identifier, Mockito.mock(StructType.class), null, null)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.alterTable(identifier)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.dropTable(identifier)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.renameTable(identifier, new_identifier)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.listTables(namespace)) + .isInstanceOf(UnsupportedOperationException.class); + + // namespace methods + assertThatThrownBy(() -> catalog.loadNamespaceMetadata(namespace)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.listNamespaces()) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.listNamespaces(namespace)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.createNamespace(namespace, null)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.alterNamespace(namespace)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.dropNamespace(namespace, false)) + .isInstanceOf(UnsupportedOperationException.class); + + // view methods + assertThatThrownBy(() -> catalog.listViews(namespace)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.loadView(identifier)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy( + () -> catalog.createView(identifier, null, null, null, null, null, null, null, null)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.alterView(identifier)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.dropView(identifier)) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> catalog.renameView(identifier, new_identifier)) + .isInstanceOf(UnsupportedOperationException.class); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 707592ead1..3884bea5bc 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -57,6 +57,21 @@ loadProperties(file("gradle/projects.main.properties")).forEach { name, director polarisProject(name as String, file(directory as String)) } +// load the polaris spark plugin projects +val polarisSparkDir = "plugins/spark" +val sparkScalaVersions = loadProperties(file("${polarisSparkDir}/spark-scala.properties")) +val sparkVersions = sparkScalaVersions["sparkVersions"].toString().split(",").map { it.trim() } + +for (sparkVersion in sparkVersions) { + val scalaVersions = sparkScalaVersions["scalaVersions"].toString().split(",").map { it.trim() } + for (scalaVersion in scalaVersions) { + polarisProject( + "polaris-spark-${sparkVersion}_${scalaVersion}", + file("${polarisSparkDir}/v${sparkVersion}"), + ) + } +} + pluginManagement { repositories { mavenCentral() // prefer Maven Central, in case Gradle's repo has issues @@ -72,6 +87,11 @@ dependencyResolutionManagement { } } +dependencyResolutionManagement { + // version catalog used by the polaris plugin code, such as polaris-spark-3.5 + versionCatalogs { create("pluginlibs") { from(files("plugins/pluginlibs.versions.toml")) } } +} + gradle.beforeProject { version = baseVersion group = "org.apache.polaris"