Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[hive] def hiveMetastoreJars: String =
getConf(HIVE_METASTORE_JARS, "builtin")

/**
* A comma separated list of class prefixes that should be loaded using the classloader that
* is shared between Spark SQL and a specific version of Hive. An example of classes that should
* be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
* to be shared are those that interact with classes that are already shared. For example,
* custom appenders that are used by log4j.
*/
protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes)
.split(",").filterNot(_ == "")

private def jdbcPrefixes = Seq(
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc").mkString(",")

/**
* A comma separated list of class prefixes that should explicitly be reloaded for each version
* of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
* prefix that typically would be shared (i.e. org.apache.spark.*)
*/
protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
getConf("spark.sql.hive.metastore.barrierPrefixes", "")
.split(",").filterNot(_ == "")

@transient
protected[sql] lazy val substitutor = new VariableSubstitution()

Expand Down Expand Up @@ -179,12 +202,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
version = metaVersion,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true)
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
// TODO: Support for loading the jars from an already downloaded location.
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig )
IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig)
} else {
// Convert to files and expand any directories.
val jars =
Expand All @@ -210,7 +235,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
version = metaVersion,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true)
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
}
isolatedLoader.client
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ private[hive] object IsolatedClientLoader {
(if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
.map(a => s"org.apache.hive:$a:${version.fullVersion}") :+
"com.google.guava:guava:14.0.1" :+
"org.apache.hadoop:hadoop-client:2.4.0" :+
"mysql:mysql-connector-java:5.1.12"
"org.apache.hadoop:hadoop-client:2.4.0"

val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
Expand Down Expand Up @@ -106,7 +105,9 @@ private[hive] class IsolatedClientLoader(
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader)
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
val sharedPrefixes: Seq[String] = Seq.empty,
val barrierPrefixes: Seq[String] = Seq.empty)
extends Logging {

// Check to make sure that the root classloader does not know about Hive.
Expand All @@ -122,13 +123,14 @@ private[hive] class IsolatedClientLoader(
name.startsWith("scala.") ||
name.startsWith("com.google") ||
name.startsWith("java.lang.") ||
name.startsWith("java.net")
name.startsWith("java.net") ||
sharedPrefixes.exists(name.startsWith)

/** True if `name` refers to a spark class that must see specific version of Hive. */
protected def isBarrierClass(name: String): Boolean =
name.startsWith("org.apache.spark.sql.hive.execution.PairSerDe") ||
name.startsWith(classOf[ClientWrapper].getName) ||
name.startsWith(classOf[ReflectionMagic].getName)
name.startsWith(classOf[ReflectionMagic].getName) ||
barrierPrefixes.exists(name.startsWith)

protected def classToPath(name: String): String =
name.replaceAll("\\.", "/") + ".class"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ import scala.collection.JavaConversions._
// SPARK-3729: Test key required to check for initialization errors with config.
object TestHive
extends TestHiveContext(
new SparkContext("local[2]", "TestSQLContext", new SparkConf().set("spark.sql.test", "")))
new SparkContext(
"local[2]",
"TestSQLContext",
new SparkConf()
.set("spark.sql.test", "")
.set(
"spark.sql.hive.metastore.barrierPrefixes",
"org.apache.spark.sql.hive.execution.PairSerDe")))

/**
* A locally running test instance of Spark's Hive execution engine.
Expand Down