Skip to content

Commit 40d3c67

Browse files
steveloughranMarcelo Vanzin
authored andcommitted
[SPARK-11265][YARN] YarnClient can't get tokens to talk to Hive 1.2.1 in a secure cluster
This is a fix for SPARK-11265; the introspection code to get Hive delegation tokens failing on Spark 1.5.1+, due to changes in the Hive codebase Author: Steve Loughran <stevel@hortonworks.com> Closes #9232 from steveloughran/stevel/patches/SPARK-11265-hive-tokens.
1 parent fc27dfb commit 40d3c67

File tree

4 files changed

+129
-49
lines changed

4 files changed

+129
-49
lines changed

yarn/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,31 @@
162162
<artifactId>jersey-server</artifactId>
163163
<scope>test</scope>
164164
</dependency>
165+
166+
<!--
167+
Testing Hive reflection needs hive on the test classpath only.
168+
It doesn't need the spark hive modules, so the -Phive flag is not checked.
169+
-->
170+
<dependency>
171+
<groupId>${hive.group}</groupId>
172+
<artifactId>hive-exec</artifactId>
173+
<scope>test</scope>
174+
</dependency>
175+
<dependency>
176+
<groupId>${hive.group}</groupId>
177+
<artifactId>hive-metastore</artifactId>
178+
<scope>test</scope>
179+
</dependency>
180+
<dependency>
181+
<groupId>org.apache.thrift</groupId>
182+
<artifactId>libthrift</artifactId>
183+
<scope>test</scope>
184+
</dependency>
185+
<dependency>
186+
<groupId>org.apache.thrift</groupId>
187+
<artifactId>libfb303</artifactId>
188+
<scope>test</scope>
189+
</dependency>
165190
</dependencies>
166191

167192
<build>

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 2 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,55 +1337,8 @@ object Client extends Logging {
13371337
conf: Configuration,
13381338
credentials: Credentials) {
13391339
if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
1340-
val mirror = universe.runtimeMirror(getClass.getClassLoader)
1341-
1342-
try {
1343-
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
1344-
val hiveConf = hiveConfClass.newInstance()
1345-
1346-
val hiveConfGet = (param: String) => Option(hiveConfClass
1347-
.getMethod("get", classOf[java.lang.String])
1348-
.invoke(hiveConf, param))
1349-
1350-
val metastore_uri = hiveConfGet("hive.metastore.uris")
1351-
1352-
// Check for local metastore
1353-
if (metastore_uri != None && metastore_uri.get.toString.size > 0) {
1354-
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
1355-
val hive = hiveClass.getMethod("get").invoke(null, hiveConf.asInstanceOf[Object])
1356-
1357-
val metastore_kerberos_principal_conf_var = mirror.classLoader
1358-
.loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars")
1359-
.getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString
1360-
1361-
val principal = hiveConfGet(metastore_kerberos_principal_conf_var)
1362-
1363-
val username = Option(UserGroupInformation.getCurrentUser().getUserName)
1364-
if (principal != None && username != None) {
1365-
val tokenStr = hiveClass.getMethod("getDelegationToken",
1366-
classOf[java.lang.String], classOf[java.lang.String])
1367-
.invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String]
1368-
1369-
val hive2Token = new Token[DelegationTokenIdentifier]()
1370-
hive2Token.decodeFromUrlString(tokenStr)
1371-
credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
1372-
logDebug("Added hive.Server2.delegation.token to conf.")
1373-
hiveClass.getMethod("closeCurrent").invoke(null)
1374-
} else {
1375-
logError("Username or principal == NULL")
1376-
logError(s"""username=${username.getOrElse("(NULL)")}""")
1377-
logError(s"""principal=${principal.getOrElse("(NULL)")}""")
1378-
throw new IllegalArgumentException("username and/or principal is equal to null!")
1379-
}
1380-
} else {
1381-
logDebug("HiveMetaStore configured in localmode")
1382-
}
1383-
} catch {
1384-
case e: java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
1385-
case e: java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
1386-
case e: Exception => { logError("Unexpected Exception " + e)
1387-
throw new RuntimeException("Unexpected exception", e)
1388-
}
1340+
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
1341+
credentials.addToken(new Text("hive.server2.delegation.token"), _)
13891342
}
13901343
}
13911344
}

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@ import java.util.regex.Matcher
2222
import java.util.regex.Pattern
2323

2424
import scala.collection.mutable.HashMap
25+
import scala.reflect.runtime._
2526
import scala.util.Try
2627

2728
import org.apache.hadoop.conf.Configuration
2829
import org.apache.hadoop.fs.Path
30+
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
2931
import org.apache.hadoop.io.Text
3032
import org.apache.hadoop.mapred.{Master, JobConf}
3133
import org.apache.hadoop.security.Credentials
3234
import org.apache.hadoop.security.UserGroupInformation
35+
import org.apache.hadoop.security.token.Token
3336
import org.apache.hadoop.yarn.conf.YarnConfiguration
3437
import org.apache.hadoop.yarn.api.ApplicationConstants
3538
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -142,6 +145,76 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
142145
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
143146
ConverterUtils.toContainerId(containerIdString)
144147
}
148+
149+
/**
150+
* Obtains token for the Hive metastore, using the current user as the principal.
151+
* Some exceptions are caught and downgraded to a log message.
152+
* @param conf hadoop configuration; the Hive configuration will be based on this
153+
* @return a token, or `None` if there's no need for a token (no metastore URI or principal
154+
* in the config), or if a binding exception was caught and downgraded.
155+
*/
156+
def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
157+
try {
158+
obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
159+
} catch {
160+
case e: ClassNotFoundException =>
161+
logInfo(s"Hive class not found $e")
162+
logDebug("Hive class not found", e)
163+
None
164+
}
165+
}
166+
167+
/**
168+
* Inner routine to obtains token for the Hive metastore; exceptions are raised on any problem.
169+
* @param conf hadoop configuration; the Hive configuration will be based on this.
170+
* @param username the username of the principal requesting the delegating token.
171+
* @return a delegation token
172+
*/
173+
private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
174+
username: String): Option[Token[DelegationTokenIdentifier]] = {
175+
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
176+
177+
// the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
178+
// to a Configuration and used without reflection
179+
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
180+
// using the (Configuration, Class) constructor allows the current configuratin to be included
181+
// in the hive config.
182+
val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
183+
classOf[Object].getClass)
184+
val hiveConf = ctor.newInstance(conf, hiveConfClass).asInstanceOf[Configuration]
185+
val metastoreUri = hiveConf.getTrimmed("hive.metastore.uris", "")
186+
187+
// Check for local metastore
188+
if (metastoreUri.nonEmpty) {
189+
require(username.nonEmpty, "Username undefined")
190+
val principalKey = "hive.metastore.kerberos.principal"
191+
val principal = hiveConf.getTrimmed(principalKey, "")
192+
require(principal.nonEmpty, "Hive principal $principalKey undefined")
193+
logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
194+
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
195+
val closeCurrent = hiveClass.getMethod("closeCurrent")
196+
try {
197+
// get all the instance methods before invoking any
198+
val getDelegationToken = hiveClass.getMethod("getDelegationToken",
199+
classOf[String], classOf[String])
200+
val getHive = hiveClass.getMethod("get", hiveConfClass)
201+
202+
// invoke
203+
val hive = getHive.invoke(null, hiveConf)
204+
val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
205+
val hive2Token = new Token[DelegationTokenIdentifier]()
206+
hive2Token.decodeFromUrlString(tokenStr)
207+
Some(hive2Token)
208+
} finally {
209+
Utils.tryLogNonFatalError {
210+
closeCurrent.invoke(null)
211+
}
212+
}
213+
} else {
214+
logDebug("HiveMetaStore configured in localmode")
215+
None
216+
}
217+
}
145218
}
146219

147220
object YarnSparkHadoopUtil {

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.spark.deploy.yarn
1919

2020
import java.io.{File, IOException}
21+
import java.lang.reflect.InvocationTargetException
2122

2223
import com.google.common.io.{ByteStreams, Files}
2324
import org.apache.hadoop.conf.Configuration
2425
import org.apache.hadoop.fs.Path
26+
import org.apache.hadoop.hive.ql.metadata.HiveException
2527
import org.apache.hadoop.yarn.api.ApplicationConstants
2628
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
2729
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -245,4 +247,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
245247
System.clearProperty("SPARK_YARN_MODE")
246248
}
247249
}
250+
251+
test("Obtain tokens For HiveMetastore") {
252+
val hadoopConf = new Configuration()
253+
hadoopConf.set("hive.metastore.kerberos.principal", "bob")
254+
// thrift picks up on port 0 and bails out, without trying to talk to endpoint
255+
hadoopConf.set("hive.metastore.uris", "http://localhost:0")
256+
val util = new YarnSparkHadoopUtil
257+
assertNestedHiveException(intercept[InvocationTargetException] {
258+
util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
259+
})
260+
// expect exception trapping code to unwind this hive-side exception
261+
assertNestedHiveException(intercept[InvocationTargetException] {
262+
util.obtainTokenForHiveMetastore(hadoopConf)
263+
})
264+
}
265+
266+
def assertNestedHiveException(e: InvocationTargetException): Throwable = {
267+
val inner = e.getCause
268+
if (inner == null) {
269+
fail("No inner cause", e)
270+
}
271+
if (!inner.isInstanceOf[HiveException]) {
272+
fail("Not a hive exception", inner)
273+
}
274+
inner
275+
}
276+
248277
}

0 commit comments

Comments
 (0)