From 4091439de6697afcc8b61acd62f0ab4f85204e3f Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Mon, 27 Feb 2023 14:57:52 -0800 Subject: [PATCH 1/2] [SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client When using the 'builtin' Hive version for the Hive metadata client, do not create a separate classloader, and rather continue to use the overall user/application classloader (regardless of Java version). This standardizes the behavior for all Java versions with that of Java 9+. See SPARK-42539 for more details on why this approach was chosen. Please see a much more detailed description in SPARK-42539. The tl;dr is that user-provided JARs (such as `hive-exec-2.3.8.jar`) take precedence over Spark/system JARs when constructing the classloader used by `IsolatedClientLoader` on Java 8 in 'builtin' mode, which can cause unexpected behavior and/or breakages. This violates the expectation that, unless user-first classloader mode is used, Spark JARs should be prioritized over user JARs. It also seems that this separate classloader was unnecessary from the start, since the intent of 'builtin' mode is to use the JARs already existing on the regular classloader (as alluded to [here](https://github.com/apache/spark/pull/24057#discussion_r265142878)). The isolated clientloader was originally added in #5876 in 2015. This bit in the PR description is the only mention of the behavior for "builtin": > attempt to discover the jars that were used to load Spark SQL and use those. This option is only valid when using the execution version of Hive. I can't follow the logic here; the user classloader clearly has all of the necessary Hive JARs, since that's where we're getting the JAR URLs from, so we could just use that directly instead of grabbing the URLs. When this was initially added, it only used the JARs from the user classloader, not any of its parents, which I suspect was the motivating factor (to try to avoid more Spark classes being duplicated inside of the isolated classloader, I guess). But that was changed a month later anyway in #6435 / #6459, so I think this may have basically been deadcode from the start. It has also caused at least one issue over the years, e.g. SPARK-21428, which disables the new-classloader behavior in the case of running inside of a CLI session. No, except to protect Spark itself from potentially being broken by bad user JARs. This includes a new unit test in `HiveUtilsSuite` which demonstrates the issue and shows that this approach resolves it. It has also been tested on a live cluster running Java 8 and Hive communication functionality continues to work as expected. --- .../scala/org/apache/spark/TestUtils.scala | 5 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 53 +----------- .../hive/client/IsolatedClientLoader.scala | 83 +++++++++---------- .../spark/sql/hive/HiveUtilsSuite.scala | 34 +++++++- 4 files changed, 78 insertions(+), 97 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index bdf81d22efa4..13ae6aca38b8 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -193,12 +193,15 @@ private[spark] object TestUtils { baseClass: String = null, classpathUrls: Seq[URL] = Seq.empty, implementsClasses: Seq[String] = Seq.empty, - extraCodeBody: String = ""): File = { + extraCodeBody: String = "", + packageName: Option[String] = None): File = { val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") val implementsText = "implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ") + val packageText = packageName.map(p => s"package $p;\n").getOrElse("") val sourceFile = new JavaSourceFromString(className, s""" + |$packageText |public class $className $extendsText $implementsText { | @Override public String toString() { return "$toStringValue"; } | diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index fe9bdef3d0e1..1a0cac42fa79 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import java.io.File -import java.net.{URL, URLClassLoader} +import java.net.URL import java.util.Locale import java.util.concurrent.TimeUnit @@ -26,11 +26,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.util.Try -import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.util.VersionInfo import org.apache.hive.common.util.HiveVersionInfo @@ -46,7 +44,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types._ -import org.apache.spark.util.{ChildFirstURLClassLoader, Utils} +import org.apache.spark.util.Utils private[spark] object HiveUtils extends Logging { @@ -321,22 +319,6 @@ private[spark] object HiveUtils extends Logging { (commonTimeVars ++ hardcodingTimeVars).toMap } - /** - * Check current Thread's SessionState type - * @return true when SessionState.get returns an instance of CliSessionState, - * false when it gets non-CliSessionState instance or null - */ - def isCliSessionState(): Boolean = { - val state = SessionState.get - var temp: Class[_] = if (state != null) state.getClass else null - var found = false - while (temp != null && !found) { - found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState" - temp = temp.getSuperclass - } - found - } - /** * Create a [[HiveClient]] used for execution. * @@ -409,43 +391,14 @@ private[spark] object HiveUtils extends Logging { s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.") } - // We recursively find all jars in the class loader chain, - // starting from the given classLoader. - def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { - case null => Array.empty[URL] - case childFirst: ChildFirstURLClassLoader => - childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader) - case urlClassLoader: URLClassLoader => - urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) - case other => allJars(other.getParent) - } - - val classLoader = Utils.getContextOrSparkClassLoader - val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - // Do nothing. The system classloader is no longer a URLClassLoader in Java 9, - // so it won't match the case in allJars. It no longer exposes URLs of - // the system classpath - Array.empty[URL] - } else { - val loadedJars = allJars(classLoader) - // Verify at least one jar was found - if (loadedJars.length == 0) { - throw new IllegalArgumentException( - "Unable to locate hive jars to connect to metastore. " + - s"Please set ${HIVE_METASTORE_JARS.key}.") - } - loadedJars - } - logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, hadoopConf = hadoopConf, - execJars = jars.toSeq, config = configurations, - isolationOn = !isCliSessionState(), + isolationOn = false, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index e65e6d42937c..879b2451cae2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -232,51 +232,46 @@ private[hive] class IsolatedClientLoader( private[hive] val classLoader: MutableURLClassLoader = { val isolatedClassLoader = if (isolationOn) { - if (allJars.isEmpty) { - // See HiveUtils; this is the Java 9+ + builtin mode scenario - baseClassLoader - } else { - val rootClassLoader: ClassLoader = - if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - // In Java 9, the boot classloader can see few JDK classes. The intended parent - // classloader for delegation is now the platform classloader. - // See http://java9.wtf/class-loading/ - val platformCL = - classOf[ClassLoader].getMethod("getPlatformClassLoader"). - invoke(null).asInstanceOf[ClassLoader] - // Check to make sure that the root classloader does not know about Hive. - assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) - platformCL + val rootClassLoader: ClassLoader = + if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { + // In Java 9, the boot classloader can see few JDK classes. The intended parent + // classloader for delegation is now the platform classloader. + // See http://java9.wtf/class-loading/ + val platformCL = + classOf[ClassLoader].getMethod("getPlatformClassLoader"). + invoke(null).asInstanceOf[ClassLoader] + // Check to make sure that the root classloader does not know about Hive. + assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) + platformCL + } else { + // The boot classloader is represented by null (the instance itself isn't accessible) + // and before Java 9 can see all JDK classes + null + } + new URLClassLoader(allJars, rootClassLoader) { + override def loadClass(name: String, resolve: Boolean): Class[_] = { + val loaded = findLoadedClass(name) + if (loaded == null) doLoadClass(name, resolve) else loaded + } + def doLoadClass(name: String, resolve: Boolean): Class[_] = { + val classFileName = name.replaceAll("\\.", "/") + ".class" + if (isBarrierClass(name)) { + // For barrier classes, we construct a new copy of the class. + val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) + logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") + defineClass(name, bytes, 0, bytes.length) + } else if (!isSharedClass(name)) { + logDebug(s"hive class: $name - ${getResource(classToPath(name))}") + super.loadClass(name, resolve) } else { - // The boot classloader is represented by null (the instance itself isn't accessible) - // and before Java 9 can see all JDK classes - null - } - new URLClassLoader(allJars, rootClassLoader) { - override def loadClass(name: String, resolve: Boolean): Class[_] = { - val loaded = findLoadedClass(name) - if (loaded == null) doLoadClass(name, resolve) else loaded - } - def doLoadClass(name: String, resolve: Boolean): Class[_] = { - val classFileName = name.replaceAll("\\.", "/") + ".class" - if (isBarrierClass(name)) { - // For barrier classes, we construct a new copy of the class. - val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) - logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") - defineClass(name, bytes, 0, bytes.length) - } else if (!isSharedClass(name)) { - logDebug(s"hive class: $name - ${getResource(classToPath(name))}") - super.loadClass(name, resolve) - } else { - // For shared classes, we delegate to baseClassLoader, but fall back in case the - // class is not found. - logDebug(s"shared class: $name") - try { - baseClassLoader.loadClass(name) - } catch { - case _: ClassNotFoundException => - super.loadClass(name, resolve) - } + // For shared classes, we delegate to baseClassLoader, but fall back in case the + // class is not found. + logDebug(s"shared class: $name") + try { + baseClassLoader.loadClass(name) + } catch { + case _: ClassNotFoundException => + super.loadClass(name, resolve) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index d8e1e0129282..823ac8ed957e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.hive +import java.io.File +import java.net.URI + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.catalog.CatalogDatabase import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.ChildFirstURLClassLoader +import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader} class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -77,6 +81,32 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton } } + test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") { + withTempDir { tmpDir => + val classFile = TestUtils.createCompiledClass( + "Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata")) + + val jarFile = new File(tmpDir, "hive-fake.jar") + TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata")) + + val conf = new SparkConf + val contextClassLoader = Thread.currentThread().getContextClassLoader + val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader) + try { + Thread.currentThread().setContextClassLoader(loader) + val client = HiveUtils.newClientForMetadata( + conf, + SparkHadoopUtil.newConfiguration(conf), + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)) + client.createDatabase( + CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()), + ignoreIfExists = true) + } finally { + Thread.currentThread().setContextClassLoader(contextClassLoader) + } + } + } + test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") { // Test default value val defaultConf = new Configuration From a31cd9bf95cd1282ea6fe6880693ad52332fc145 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Tue, 28 Feb 2023 13:02:29 -0800 Subject: [PATCH 2/2] Address test failures by allowing for independently adjusting isolationOn (for classpath isolation) vs sessionStateIsolationOn (for SessionState isolation) --- .../org/apache/spark/sql/hive/HiveUtils.scala | 18 ++++++++++++++++++ .../spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../sql/hive/client/IsolatedClientLoader.scala | 11 +++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 1a0cac42fa79..4637a4a01798 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -29,6 +29,7 @@ import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.util.VersionInfo import org.apache.hive.common.util.HiveVersionInfo @@ -319,6 +320,22 @@ private[spark] object HiveUtils extends Logging { (commonTimeVars ++ hardcodingTimeVars).toMap } + /** + * Check current Thread's SessionState type + * @return true when SessionState.get returns an instance of CliSessionState, + * false when it gets non-CliSessionState instance or null + */ + def isCliSessionState(): Boolean = { + val state = SessionState.get + var temp: Class[_] = if (state != null) state.getClass else null + var found = false + while (temp != null && !found) { + found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState" + temp = temp.getSuperclass + } + found + } + /** * Create a [[HiveClient]] used for execution. * @@ -399,6 +416,7 @@ private[spark] object HiveUtils extends Logging { hadoopConf = hadoopConf, config = configurations, isolationOn = false, + sessionStateIsolationOverride = Some(!isCliSessionState()), barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 0a83ec2689c8..f76cc7f3a412 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -134,7 +134,7 @@ private[hive] class HiveClientImpl( // Create an internal session state for this HiveClientImpl. val state: SessionState = { val original = Thread.currentThread().getContextClassLoader - if (clientLoader.isolationOn) { + if (clientLoader.sessionStateIsolationOn) { // Switch to the initClassLoader. Thread.currentThread().setContextClassLoader(initClassLoader) try { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 879b2451cae2..2756a2b18a99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -180,6 +180,9 @@ private[hive] object IsolatedClientLoader extends Logging { * @param config A set of options that will be added to the HiveConf of the constructed client. * @param isolationOn When true, custom versions of barrier classes will be constructed. Must be * true unless loading the version of hive that is on Spark's classloader. + * @param sessionStateIsolationOverride If present, this parameter will specify the value of + * `sessionStateIsolationOn`. If empty (the default), the + * value of `isolationOn` will be used. * @param baseClassLoader The spark classloader that is used to load shared classes. */ private[hive] class IsolatedClientLoader( @@ -189,11 +192,19 @@ private[hive] class IsolatedClientLoader( val execJars: Seq[URL] = Seq.empty, val config: Map[String, String] = Map.empty, val isolationOn: Boolean = true, + sessionStateIsolationOverride: Option[Boolean] = None, val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader, val sharedPrefixes: Seq[String] = Seq.empty, val barrierPrefixes: Seq[String] = Seq.empty) extends Logging { + /** + * This controls whether the generated clients maintain an independent/isolated copy of the + * Hive `SessionState`. If false, the Hive will leverage the global/static copy of + * `SessionState`; if true, it will generate a new copy of the state internally. + */ + val sessionStateIsolationOn: Boolean = sessionStateIsolationOverride.getOrElse(isolationOn) + /** All jars used by the hive specific classloader. */ protected def allJars = execJars.toArray