Skip to content

Commit

Permalink
[Delta-Iceberg] Fix delta-iceberg jar to not pull in delta-spark and …
Browse files Browse the repository at this point in the history
…delta-storage jars

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [X] Other (delta-iceberg)

Resolves delta-io#1903

Previously, the `delta-iceberg` jar was incorrectly including all of the classes from `delta-spark` and `delta-storage`.

You could run

```
wget https://repo1.maven.org/maven2/io/delta/delta-iceberg_2.13/3.0.0rc1/delta-iceberg_2.13-3.0.0rc1.jar
jar tvf delta-iceberg_2.13-3.0.0rc1.jar
```

and see

```
com/databricks/spark/util/MetricDefinitions.class
...
io/delta/storage/internal/ThreadUtils.class
...
org/apache/spark/sql/delta/DeltaLog.class
```

This PR fixes that by updating various SBT assembly configs:
1) `assemblyExcludedJars`: excluding jars we don't want (but this only works for jars from `libraryDependencies`, not `.dependsOn`)
2) `assemblyMergeStrategy`: manually discarding other classes using case matching

Added a new test suite and sbt project. The new project depends on the assembled version of the `delta-iceberg` jar. The test suite loads that jar and analyses its classes.

Published the jars locally and ran through a simple end-to-end UniForm example.

```
========== Delta ==========

build/sbt storage/publishM2
build/sbt spark/publishM2
build/sbt iceberg/publishM2

spark-shell --packages io.delta:delta-spark_2.12:3.0.0-SNAPSHOT,io.delta:delta-storage:3.0.0-SNAPSHOT,io.delta:delta-iceberg_2.12:3.0.0-SNAPSHOT --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

val tablePath = "/Users/scott.sandre/uniform_tables/table_000"

sql(s"CREATE TABLE delta.`$tablePath` (col1 INT, col2 INT) USING DELTA TBLPROPERTIES ('delta.universalFormat.enabledFormats'='iceberg')")

sql(s"INSERT INTO delta.`$tablePath` VALUES (1, 1), (2,2), (3, 3)")

sql(s"SELECT * FROM delta.`$tablePath`").show()
+----+----+
|col1|col2|
+----+----+
|   3|   3|
|   2|   2|
|   1|   1|
+----+----+

==========  Iceberg ==========

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1 \
	--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
	--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
	--conf spark.sql.catalog.spark_catalog.type=hive \
	--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
	--conf spark.sql.catalog.local.type=hadoop \
	--conf spark.sql.catalog.local.warehouse=/Users/scott.sandre/iceberg_warehouse

spark.read.format("iceberg").load("/Users/scott.sandre/uniform_tables/table_000").show()
+----+----+
|col1|col2|
+----+----+
|   1|   1|
|   2|   2|
|   3|   3|
+----+----+
```

Fixes a bug where delta-iceberg jar included delta-spark and delta-storage

Closes delta-io#2022

Signed-off-by: Scott Sandre <scott.sandre@databricks.com>
GitOrigin-RevId: 187ca6a09bd423de0fde00bff26e47a88797a2f2
  • Loading branch information
scottsand-db authored and allisonport-db committed Sep 14, 2023
1 parent 51f97b8 commit a6bdc35
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 0 deletions.
73 changes: 73 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,36 @@ val icebergSparkRuntimeArtifactName = {
s"iceberg-spark-runtime-$expMaj.$expMin"
}

lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar"))
// delta-iceberg depends on delta-spark! So, we need to include it during our test.
.dependsOn(spark % "test")
.settings(
name := "test-delta-iceberg-jar",
commonSettings,
skipReleaseSettings,
exportJars := true,
Compile / unmanagedJars += (iceberg / assembly).value,
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.apache.spark" %% "spark-core" % sparkVersion % "test"
)
)

val deltaIcebergSparkIncludePrefixes = Seq(
// We want everything from this package
"org/apache/spark/sql/delta/icebergShaded",

// We only want the files in this project from this package. e.g. we want to exclude
// org/apache/spark/sql/delta/commands/convert/ConvertTargetFile.class (from delta-spark project).
"org/apache/spark/sql/delta/commands/convert/IcebergFileManifest",
"org/apache/spark/sql/delta/commands/convert/IcebergSchemaUtils",
"org/apache/spark/sql/delta/commands/convert/IcebergTable"
)

// Build using: build/sbt clean icebergShaded/compile iceberg/compile
// It will fail the first time, just re-run it.
// scalastyle:off println
lazy val iceberg = (project in file("iceberg"))
.dependsOn(spark % "compile->compile;test->test;provided->provided")
.settings (
Expand All @@ -343,8 +371,53 @@ lazy val iceberg = (project in file("iceberg"))
assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar",
assembly / logLevel := Level.Info,
assembly / test := {},
assembly / assemblyExcludedJars := {
// Note: the input here is only `libraryDependencies` jars, not `.dependsOn(_)` jars.
val allowedJars = Seq(
"iceberg-shaded_2.12-3.0.0-SNAPSHOT.jar",
"scala-library-2.12.15.jar",
"scala-collection-compat_2.12-2.1.1.jar",
"caffeine-2.9.3.jar",
// Note: We are excluding
// - antlr4-runtime-4.9.3.jar
// - checker-qual-3.19.0.jar
// - error_prone_annotations-2.10.0.jar
)
val cp = (assembly / fullClasspath).value

// Return `true` when we want the jar `f` to be excluded from the assembly jar
cp.filter { f =>
val doExclude = !allowedJars.contains(f.data.getName)
println(s"Excluding jar: ${f.data.getName} ? $doExclude")
doExclude
}
},
assembly / assemblyMergeStrategy := {
// Project iceberg `dependsOn` spark and accidentally brings in it, along with its
// compile-time dependencies (like delta-storage). We want these excluded from the
// delta-iceberg jar.
case PathList("io", "delta", xs @ _*) =>
// - delta-storage will bring in classes: io/delta/storage
// - delta-spark will bring in classes: io/delta/exceptions/, io/delta/implicits,
// io/delta/package, io/delta/sql, io/delta/tables,
println(s"Discarding class: io/delta/${xs.mkString("/")}")
MergeStrategy.discard
case PathList("com", "databricks", xs @ _*) =>
// delta-spark will bring in com/databricks/spark/util
println(s"Discarding class: com/databricks/${xs.mkString("/")}")
MergeStrategy.discard
case PathList("org", "apache", "spark", xs @ _*)
if !deltaIcebergSparkIncludePrefixes.exists { prefix =>
s"org/apache/spark/${xs.mkString("/")}".startsWith(prefix) } =>
println(s"Discarding class: org/apache/spark/${xs.mkString("/")}")
MergeStrategy.discard
case x =>
println(s"Including class: $x")
(assembly / assemblyMergeStrategy).value(x)
},
assemblyPackageScala / assembleArtifact := false
)
// scalastyle:on println

lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs")

Expand Down
106 changes: 106 additions & 0 deletions testDeltaIcebergJar/src/test/scala/JarSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (2023-present) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.io.File
import java.net.JarURLConnection
import java.util.jar.JarFile

import scala.collection.JavaConverters._

import org.scalatest.funsuite.AnyFunSuite

class JarSuite extends AnyFunSuite {

val allowedClassPrefixes = Seq(
// e.g. shadedForDelta/org/apache/iceberg/BaseTable.class
"shadedForDelta/",
// e.g. scala/collection/compat/immutable/ArraySeq.class
// e.g. scala/jdk/CollectionConverters.class
"scala/",
// e.g. org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.class
"org/apache/spark/sql/delta/icebergShaded/",
// We explicitly include all the /delta/commands/convert classes we want, to ensure we don't
// accidentally pull in some from delta-spark package.
"org/apache/spark/sql/delta/commands/convert/IcebergFileManifest",
"org/apache/spark/sql/delta/commands/convert/IcebergSchemaUtils",
"org/apache/spark/sql/delta/commands/convert/IcebergTable",
// e.g. org/apache/iceberg/transforms/IcebergPartitionUtil.class
"org/apache/iceberg/",
"com/github/benmanes/caffeine/"
)

test("audit files in assembly jar") {
// Step 1: load the jar (and make sure it exists)
// scalastyle:off classforname
val classUrl = Class.forName("org.apache.spark.sql.delta.icebergShaded.IcebergConverter").getResource("IcebergConverter.class")
// scalastyle:on classforname
assert(classUrl != null, "Could not find delta-iceberg jar")
val connection = classUrl.openConnection().asInstanceOf[JarURLConnection]
val url = connection.getJarFileURL
val jarFile = new JarFile(new File(url.toURI))

// Step 2: Verify the JAR has the classes we want it to have
try {
val jarClasses = jarFile
.entries()
.asScala
.filter(!_.isDirectory)
.map(_.toString)
.filter(_.endsWith(".class")) // let's ignore any .properties or META-INF files for now
.toSet

// 2.1: Verify there are no prohibited classes (e.g. io/delta/storage/...)
//
// You can test this code path by commenting out the "io/delta" match case of the
// assemblyMergeStrategy config in build.sbt.
val prohibitedJarClasses = jarClasses
.filter { clazz => !allowedClassPrefixes.exists(prefix => clazz.startsWith(prefix)) }

if (prohibitedJarClasses.nonEmpty) {
throw new Exception(
s"Prohibited jar class(es) found:\n- ${prohibitedJarClasses.mkString("\n- ")}"
)
}

// 2.2: Verify that, for each allowed class prefix, we actually loaded a class for it (instead
// of, say, loading an empty jar).
//
// You can test this code path by adding the following code snippet to the delta-iceberg
// assemblyMergeStrategy config in build.sbt:
// case PathList("shadedForDelta", xs @ _*) => MergeStrategy.discard

// Map of prefix -> # classes with that prefix
val allowedClassesCounts = scala.collection.mutable.Map(
allowedClassPrefixes.map(prefix => (prefix, 0)) : _*
)
jarClasses.foreach { clazz =>
allowedClassPrefixes.foreach { prefix =>
if (clazz.startsWith(prefix)) {
allowedClassesCounts(prefix) += 1
}
}
}
val missingClasses = allowedClassesCounts.filter(_._2 == 0).keys
if (missingClasses.nonEmpty) {
throw new Exception(
s"No classes found for the following prefix(es):\n- ${missingClasses.mkString("\n- ")}"
)
}
} finally {
jarFile.close()
}
}
}

0 comments on commit a6bdc35

Please sign in to comment.