diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index bdf81d22efa4..13ae6aca38b8 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -193,12 +193,15 @@ private[spark] object TestUtils { baseClass: String = null, classpathUrls: Seq[URL] = Seq.empty, implementsClasses: Seq[String] = Seq.empty, - extraCodeBody: String = ""): File = { + extraCodeBody: String = "", + packageName: Option[String] = None): File = { val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") val implementsText = "implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ") + val packageText = packageName.map(p => s"package $p;\n").getOrElse("") val sourceFile = new JavaSourceFromString(className, s""" + |$packageText |public class $className $extendsText $implementsText { | @Override public String toString() { return "$toStringValue"; } | diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index fe9bdef3d0e1..4637a4a01798 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import java.io.File -import java.net.{URL, URLClassLoader} +import java.net.URL import java.util.Locale import java.util.concurrent.TimeUnit @@ -26,7 +26,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.util.Try -import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -46,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types._ -import org.apache.spark.util.{ChildFirstURLClassLoader, Utils} +import org.apache.spark.util.Utils private[spark] object HiveUtils extends Logging { @@ -409,43 +408,15 @@ private[spark] object HiveUtils extends Logging { s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.") } - // We recursively find all jars in the class loader chain, - // starting from the given classLoader. - def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { - case null => Array.empty[URL] - case childFirst: ChildFirstURLClassLoader => - childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader) - case urlClassLoader: URLClassLoader => - urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) - case other => allJars(other.getParent) - } - - val classLoader = Utils.getContextOrSparkClassLoader - val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - // Do nothing. The system classloader is no longer a URLClassLoader in Java 9, - // so it won't match the case in allJars. It no longer exposes URLs of - // the system classpath - Array.empty[URL] - } else { - val loadedJars = allJars(classLoader) - // Verify at least one jar was found - if (loadedJars.length == 0) { - throw new IllegalArgumentException( - "Unable to locate hive jars to connect to metastore. " + - s"Please set ${HIVE_METASTORE_JARS.key}.") - } - loadedJars - } - logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, hadoopConf = hadoopConf, - execJars = jars.toSeq, config = configurations, - isolationOn = !isCliSessionState(), + isolationOn = false, + sessionStateIsolationOverride = Some(!isCliSessionState()), barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 0a83ec2689c8..f76cc7f3a412 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -134,7 +134,7 @@ private[hive] class HiveClientImpl( // Create an internal session state for this HiveClientImpl. val state: SessionState = { val original = Thread.currentThread().getContextClassLoader - if (clientLoader.isolationOn) { + if (clientLoader.sessionStateIsolationOn) { // Switch to the initClassLoader. Thread.currentThread().setContextClassLoader(initClassLoader) try { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index e65e6d42937c..2756a2b18a99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -180,6 +180,9 @@ private[hive] object IsolatedClientLoader extends Logging { * @param config A set of options that will be added to the HiveConf of the constructed client. * @param isolationOn When true, custom versions of barrier classes will be constructed. Must be * true unless loading the version of hive that is on Spark's classloader. + * @param sessionStateIsolationOverride If present, this parameter will specify the value of + * `sessionStateIsolationOn`. If empty (the default), the + * value of `isolationOn` will be used. * @param baseClassLoader The spark classloader that is used to load shared classes. */ private[hive] class IsolatedClientLoader( @@ -189,11 +192,19 @@ private[hive] class IsolatedClientLoader( val execJars: Seq[URL] = Seq.empty, val config: Map[String, String] = Map.empty, val isolationOn: Boolean = true, + sessionStateIsolationOverride: Option[Boolean] = None, val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader, val sharedPrefixes: Seq[String] = Seq.empty, val barrierPrefixes: Seq[String] = Seq.empty) extends Logging { + /** + * This controls whether the generated clients maintain an independent/isolated copy of the + * Hive `SessionState`. If false, the Hive will leverage the global/static copy of + * `SessionState`; if true, it will generate a new copy of the state internally. + */ + val sessionStateIsolationOn: Boolean = sessionStateIsolationOverride.getOrElse(isolationOn) + /** All jars used by the hive specific classloader. */ protected def allJars = execJars.toArray @@ -232,51 +243,46 @@ private[hive] class IsolatedClientLoader( private[hive] val classLoader: MutableURLClassLoader = { val isolatedClassLoader = if (isolationOn) { - if (allJars.isEmpty) { - // See HiveUtils; this is the Java 9+ + builtin mode scenario - baseClassLoader - } else { - val rootClassLoader: ClassLoader = - if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - // In Java 9, the boot classloader can see few JDK classes. The intended parent - // classloader for delegation is now the platform classloader. - // See http://java9.wtf/class-loading/ - val platformCL = - classOf[ClassLoader].getMethod("getPlatformClassLoader"). - invoke(null).asInstanceOf[ClassLoader] - // Check to make sure that the root classloader does not know about Hive. - assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) - platformCL + val rootClassLoader: ClassLoader = + if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { + // In Java 9, the boot classloader can see few JDK classes. The intended parent + // classloader for delegation is now the platform classloader. + // See http://java9.wtf/class-loading/ + val platformCL = + classOf[ClassLoader].getMethod("getPlatformClassLoader"). + invoke(null).asInstanceOf[ClassLoader] + // Check to make sure that the root classloader does not know about Hive. + assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) + platformCL + } else { + // The boot classloader is represented by null (the instance itself isn't accessible) + // and before Java 9 can see all JDK classes + null + } + new URLClassLoader(allJars, rootClassLoader) { + override def loadClass(name: String, resolve: Boolean): Class[_] = { + val loaded = findLoadedClass(name) + if (loaded == null) doLoadClass(name, resolve) else loaded + } + def doLoadClass(name: String, resolve: Boolean): Class[_] = { + val classFileName = name.replaceAll("\\.", "/") + ".class" + if (isBarrierClass(name)) { + // For barrier classes, we construct a new copy of the class. + val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) + logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") + defineClass(name, bytes, 0, bytes.length) + } else if (!isSharedClass(name)) { + logDebug(s"hive class: $name - ${getResource(classToPath(name))}") + super.loadClass(name, resolve) } else { - // The boot classloader is represented by null (the instance itself isn't accessible) - // and before Java 9 can see all JDK classes - null - } - new URLClassLoader(allJars, rootClassLoader) { - override def loadClass(name: String, resolve: Boolean): Class[_] = { - val loaded = findLoadedClass(name) - if (loaded == null) doLoadClass(name, resolve) else loaded - } - def doLoadClass(name: String, resolve: Boolean): Class[_] = { - val classFileName = name.replaceAll("\\.", "/") + ".class" - if (isBarrierClass(name)) { - // For barrier classes, we construct a new copy of the class. - val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) - logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") - defineClass(name, bytes, 0, bytes.length) - } else if (!isSharedClass(name)) { - logDebug(s"hive class: $name - ${getResource(classToPath(name))}") - super.loadClass(name, resolve) - } else { - // For shared classes, we delegate to baseClassLoader, but fall back in case the - // class is not found. - logDebug(s"shared class: $name") - try { - baseClassLoader.loadClass(name) - } catch { - case _: ClassNotFoundException => - super.loadClass(name, resolve) - } + // For shared classes, we delegate to baseClassLoader, but fall back in case the + // class is not found. + logDebug(s"shared class: $name") + try { + baseClassLoader.loadClass(name) + } catch { + case _: ClassNotFoundException => + super.loadClass(name, resolve) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index d8e1e0129282..823ac8ed957e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.hive +import java.io.File +import java.net.URI + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.catalog.CatalogDatabase import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.ChildFirstURLClassLoader +import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader} class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -77,6 +81,32 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton } } + test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") { + withTempDir { tmpDir => + val classFile = TestUtils.createCompiledClass( + "Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata")) + + val jarFile = new File(tmpDir, "hive-fake.jar") + TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata")) + + val conf = new SparkConf + val contextClassLoader = Thread.currentThread().getContextClassLoader + val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader) + try { + Thread.currentThread().setContextClassLoader(loader) + val client = HiveUtils.newClientForMetadata( + conf, + SparkHadoopUtil.newConfiguration(conf), + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)) + client.createDatabase( + CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()), + ignoreIfExists = true) + } finally { + Thread.currentThread().setContextClassLoader(contextClassLoader) + } + } + } + test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") { // Test default value val defaultConf = new Configuration