Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,18 @@ package org.apache.spark.sql.hive

import java.io.File
import java.net.{URL, URLClassLoader}
import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.language.implicitConversions

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{SparkConf, SparkContext}
Expand Down Expand Up @@ -329,10 +326,17 @@ private[spark] object HiveUtils extends Logging {

val classLoader = Utils.getContextOrSparkClassLoader
val jars = allJars(classLoader)
if (jars.length == 0) {
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore. " +
s"Please set ${HIVE_METASTORE_JARS.key}.")
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// Do nothing. The system classloader is no longer a URLClassLoader in Java 9,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we are unable to detect the missing Hive jar with Java 9+?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code above tries to get the URLs of all the JARs in the classpath from the system classloader. This isn't available in Java 9. However these only seem to be needed in 'builtin' mode, in which case (I believe) we don't need to do this at all -- don't make a new ClassLoader, just use the current ClassLoader.

@squito has an alternative, to read the classpath off of java.class.path. I think that works too. It's less change, but, I was also wondering whether the whole idea of making a new ClassLoader in this mode isn't necessary anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your approach makes more sense. I was a little worried that someone would add a jar directly to classloader, from java code, which wouldn't show up on java.class.path. probably an unsupported gray area anyway, but this might make it work in a few more cases

// so it won't match the case in allJars above. It no longer exposes URLs of
// the system classpath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have something slightly different here -- admittedly I wasn't really sure if this was the right thing to do or not, it was just a hack to let me get a little further, not principled at all.

      // For java 11, allJars() does *not* get jars on the classpath added, so we add
      // them here
      val classPathJars = sys.props("java.class.path").split(":").map(new File(_).toURI().toURL())
      logWarning(s"Classpath jars = ${classPathJars.mkString(",")}")
      val jars = allJars(classLoader) ++ classPathJars

I think this will solve your HiveClientImpl issue ... but that doesn't mean its the right thing to do :P

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nice call there. I'll try just avoiding a whole new classloader here too just to see if it works; for 'builtin' that seems simpler. But I am wary of changes. But I am wary of depending on something else in the env too, even though that's pretty safe.

} else {
// Verify at least one jar was found
if (jars.length == 0) {
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore. " +
s"Please set ${HIVE_METASTORE_JARS.key}.")
}
}

logInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util
import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

Expand Down Expand Up @@ -157,7 +158,6 @@ private[hive] object IsolatedClientLoader extends Logging {
* @param isolationOn When true, custom versions of barrier classes will be constructed. Must be
* true unless loading the version of hive that is on Sparks classloader.
* @param sharesHadoopClasses When true, we will share Hadoop classes between Spark and
* @param rootClassLoader The system root classloader. Must not know about Hive classes.
* @param baseClassLoader The spark classloader that is used to load shared classes.
*/
private[hive] class IsolatedClientLoader(
Expand All @@ -168,15 +168,11 @@ private[hive] class IsolatedClientLoader(
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
val sharesHadoopClasses: Boolean = true,
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
val sharedPrefixes: Seq[String] = Seq.empty,
val barrierPrefixes: Seq[String] = Seq.empty)
extends Logging {

// Check to make sure that the root classloader does not know about Hive.
assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This always failed before with an NPE. The root classloader is represented by null!


/** All jars used by the hive specific classloader. */
protected def allJars = execJars.toArray

Expand All @@ -191,8 +187,8 @@ private[hive] class IsolatedClientLoader(
(sharesHadoopClasses && isHadoopClass) ||
name.startsWith("scala.") ||
(name.startsWith("com.google") && !name.startsWith("com.google.cloud")) ||
name.startsWith("java.lang.") ||
name.startsWith("java.net") ||
name.startsWith("java.") ||
name.startsWith("javax.sql.") ||
sharedPrefixes.exists(name.startsWith)
}

Expand All @@ -214,30 +210,51 @@ private[hive] class IsolatedClientLoader(
private[hive] val classLoader: MutableURLClassLoader = {
val isolatedClassLoader =
if (isolationOn) {
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
if (allJars.isEmpty) {
// See HiveUtils; this is the Java 9+ + builtin mode scenario
baseClassLoader
} else {
val rootClassLoader: ClassLoader =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// In Java 9, the boot classloader can see few JDK classes. The intended parent
// classloader for delegation is now the platform classloader.
// See http://java9.wtf/class-loading/
val platformCL =
classOf[ClassLoader].getMethod("getPlatformClassLoader").
invoke(null).asInstanceOf[ClassLoader]
// Check to make sure that the root classloader does not know about Hive.
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
platformCL
} else {
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
// The boot classloader is represented by null (the instance itself isn't accessible)
// and before Java 9 can see all JDK classes
null
}
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
}
}
}
Expand Down