Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,31 @@
<artifactId>jersey-server</artifactId>
<scope>test</scope>
</dependency>

<!--
Testing Hive reflection needs hive on the test classpath only.
It doesn't need the spark hive modules, so the -Phive flag is not checked.
-->
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-metastore</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
51 changes: 2 additions & 49 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1337,55 +1337,8 @@ object Client extends Logging {
conf: Configuration,
credentials: Credentials) {
if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to add a check for isClusterMode too ?
I believe we don't need a delegation token if we are running in client mode.
The driver will have access to the submitting user's kerberos tokens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, probably does. In fact, getting a delegation token there is potentially worse than useless, as that DT will eventually expire. Using the keytab direct will, provided the user keeps logging on, stay valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @tgravescs points out, the tokens are needed throughout the cluster, and yes, must be obtained irrespective of deployment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't really think the delegation tokens are needed in client mode (for Hive only). Only the driver talks to the metastore.

val mirror = universe.runtimeMirror(getClass.getClassLoader)

try {
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
val hiveConf = hiveConfClass.newInstance()

val hiveConfGet = (param: String) => Option(hiveConfClass
.getMethod("get", classOf[java.lang.String])
.invoke(hiveConf, param))

val metastore_uri = hiveConfGet("hive.metastore.uris")

// Check for local metastore
if (metastore_uri != None && metastore_uri.get.toString.size > 0) {
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
val hive = hiveClass.getMethod("get").invoke(null, hiveConf.asInstanceOf[Object])

val metastore_kerberos_principal_conf_var = mirror.classLoader
.loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars")
.getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString

val principal = hiveConfGet(metastore_kerberos_principal_conf_var)

val username = Option(UserGroupInformation.getCurrentUser().getUserName)
if (principal != None && username != None) {
val tokenStr = hiveClass.getMethod("getDelegationToken",
classOf[java.lang.String], classOf[java.lang.String])
.invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String]

val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
logDebug("Added hive.Server2.delegation.token to conf.")
hiveClass.getMethod("closeCurrent").invoke(null)
} else {
logError("Username or principal == NULL")
logError(s"""username=${username.getOrElse("(NULL)")}""")
logError(s"""principal=${principal.getOrElse("(NULL)")}""")
throw new IllegalArgumentException("username and/or principal is equal to null!")
}
} else {
logDebug("HiveMetaStore configured in localmode")
}
} catch {
case e: java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
case e: java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
case e: Exception => { logError("Unexpected Exception " + e)
throw new RuntimeException("Unexpected exception", e)
}
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should go to utils.getContextOrSparkClassLoader(); notable that scalastyle doesn't pick up on this, even though it rejects Class.forName() since SPARK-8962

credentials.addToken(new Text("hive.server2.delegation.token"), _)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import java.util.regex.Matcher
import java.util.regex.Pattern

import scala.collection.mutable.HashMap
import scala.reflect.runtime._
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{Master, JobConf}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.Token
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
Expand Down Expand Up @@ -142,6 +145,76 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
ConverterUtils.toContainerId(containerIdString)
}

/**
* Obtains token for the Hive metastore, using the current user as the principal.
* Some exceptions are caught and downgraded to a log message.
* @param conf hadoop configuration; the Hive configuration will be based on this
* @return a token, or `None` if there's no need for a token (no metastore URI or principal
* in the config), or if a binding exception was caught and downgraded.
*/
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
try {
obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
} catch {
case e: ClassNotFoundException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda feel that everything but this exception should be propagated. That's really the only valid error you might expect; someone running Spark without Hive classes, and even that might change (I think there's a bug tracking removal of the hive profile from the maven build).

Every other error means that either you have the wrong version of Hive in the classpath (user error) or that your configuration says you need delegation tokens, but you can't get them for some reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I'd left it in there as it may have had a valid reason for being there, but i do things it's correct. Detecting config problems, that is something to throw up.

Note that Client.obtainTokenForHBase() has similar behaviour; this patch doesn't address it. When someone sits down to do it, the policy about how to react to failures could be converted into a wrapper around a closure which executes the token retrieval (here obtainTokenForHiveMetastoreInner), so there'd be no divergence.

logInfo(s"Hive class not found $e")
logDebug("Hive class not found", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why double log the message ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're not exactly the same.

None
}
}

/**
* Inner routine to obtains token for the Hive metastore; exceptions are raised on any problem.
* @param conf hadoop configuration; the Hive configuration will be based on this.
* @param username the username of the principal requesting the delegating token.
* @return a delegation token
*/
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
username: String): Option[Token[DelegationTokenIdentifier]] = {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)

// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
// to a Configuration and used without reflection
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
// using the (Configuration, Class) constructor allows the current configuratin to be included
// in the hive config.
val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
classOf[Object].getClass)
val hiveConf = ctor.newInstance(conf, hiveConfClass).asInstanceOf[Configuration]
val metastoreUri = hiveConf.getTrimmed("hive.metastore.uris", "")

// Check for local metastore
if (metastoreUri.nonEmpty) {
require(username.nonEmpty, "Username undefined")
val principalKey = "hive.metastore.kerberos.principal"
val principal = hiveConf.getTrimmed(principalKey, "")
require(principal.nonEmpty, "Hive principal $principalKey undefined")
logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
val closeCurrent = hiveClass.getMethod("closeCurrent")
try {
// get all the instance methods before invoking any
val getDelegationToken = hiveClass.getMethod("getDelegationToken",
classOf[String], classOf[String])
val getHive = hiveClass.getMethod("get", hiveConfClass)

// invoke
val hive = getHive.invoke(null, hiveConf)
val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
Some(hive2Token)
} finally {
Utils.tryLogNonFatalError {
closeCurrent.invoke(null)
}
}
} else {
logDebug("HiveMetaStore configured in localmode")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"local mode"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea, but it's what the original code said.

None
}
}
}

object YarnSparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.spark.deploy.yarn

import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
Expand Down Expand Up @@ -245,4 +247,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
System.clearProperty("SPARK_YARN_MODE")
}
}

test("Obtain tokens For HiveMetastore") {
val hadoopConf = new Configuration()
hadoopConf.set("hive.metastore.kerberos.principal", "bob")
// thrift picks up on port 0 and bails out, without trying to talk to endpoint
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
val util = new YarnSparkHadoopUtil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YarnSparkHadoopUtil.get (although you would need to set SPARK_YARN_MODE in system properties for that, so maybe it's ok to leave as is).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the other tests do the same; if you want a switch it may as well be across the suite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah, it's ok to leave as is.

assertNestedHiveException(intercept[InvocationTargetException] {
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
})
// expect exception trapping code to unwind this hive-side exception
assertNestedHiveException(intercept[InvocationTargetException] {
util.obtainTokenForHiveMetastore(hadoopConf)
})
}

def assertNestedHiveException(e: InvocationTargetException): Throwable = {
val inner = e.getCause
if (inner == null) {
fail("No inner cause", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why we have asserts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

}
if (!inner.isInstanceOf[HiveException]) {
fail("Not a hive exception", inner)
}
inner
}

}