From bf356d28773b16637cedf38c2efdfeb42c131e56 Mon Sep 17 00:00:00 2001 From: Doug Balog Date: Sun, 15 Mar 2015 03:07:54 -0400 Subject: [PATCH 1/3] [SPARK-6207] [YARN] [SQL] Adds delegation tokens for metastore to conf. Adds hive2-metastore delagations token to conf when running in securemode. Without this change, runing on YARN in cluster mode fails with a GSS exception. This contribution is my original work and that I licenses the work to the Apache Spark project under the project's open source licenses. Author: Doug Balog --- yarn/pom.xml | 4 +++ .../org/apache/spark/deploy/yarn/Client.scala | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/yarn/pom.xml b/yarn/pom.xml index c13534f0410a1..27bf041ae702b 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -58,6 +58,10 @@ org.apache.hadoop hadoop-client + + ${hive.group} + hive-exec + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 61f8fc3f5a014..e5488bb4b6b89 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -28,11 +28,15 @@ import com.google.common.base.Objects import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.Master import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment @@ -217,6 +221,7 @@ private[spark] class Client( val dst = new Path(fs.getHomeDirectory(), appStagingDir) val nns = getNameNodesToAccess(sparkConf) + dst obtainTokensForNamenodes(nns, hadoopConf, credentials) + obtainTokenForHiveMetastore(hadoopConf, credentials) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", fs.getDefaultReplication(dst)).toShort @@ -902,6 +907,30 @@ object Client extends Logging { } } + /** + * Obtains token for the Hive metastore and adds them to the credentials. + */ + private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) { + if (UserGroupInformation.isSecurityEnabled /* And Hive is enabled */) { + val hc = org.apache.hadoop.hive.ql.metadata.Hive.get + val principal = hc.getConf().get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname) + val username = UserGroupInformation.getCurrentUser().getUserName + + if (principal == null) { + val errorMessage = "Required hive metastore principal is not configured!" + logError(errorMessage) + throw new IllegalArgumentException(errorMessage) + } + + val tokenStr = hc.getDelegationToken(username,principal) + val hive2Token = new Token[DelegationTokenIdentifier]() + hive2Token.decodeFromUrlString(tokenStr) + credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token) + logDebug("Added the Hive Server 2 token to conf.") + org.apache.hadoop.hive.ql.metadata.Hive.closeCurrent + } + } + /** * Return whether the two file systems are the same. */ From e260765c0b446b9fb2842b2d942a613f64198524 Mon Sep 17 00:00:00 2001 From: Doug Balog Date: Tue, 7 Apr 2015 22:30:52 -0400 Subject: [PATCH 2/3] [SPARK-6207] Second pass at adding Hive delegation token to conf. - Use reflection instead of adding dependency on hive. - Tested on Hive 0.13 and Hadoop 2.4.1 --- yarn/pom.xml | 4 -- .../org/apache/spark/deploy/yarn/Client.scala | 68 ++++++++++++++----- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/yarn/pom.xml b/yarn/pom.xml index c3f9246bf57c9..7c8c3613e7a05 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -58,10 +58,6 @@ org.apache.hadoop hadoop-client - - ${hive.group} - hive-exec - diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0f21de29a1b1e..d6137b21f2e78 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map} +import scala.reflect.runtime.universe import scala.util.{Try, Success, Failure} import com.google.common.base.Objects @@ -29,7 +30,6 @@ import com.google.common.base.Objects import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.io.Text @@ -928,23 +928,57 @@ object Client extends Logging { * Obtains token for the Hive metastore and adds them to the credentials. */ private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) { - if (UserGroupInformation.isSecurityEnabled /* And Hive is enabled */) { - val hc = org.apache.hadoop.hive.ql.metadata.Hive.get - val principal = hc.getConf().get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname) - val username = UserGroupInformation.getCurrentUser().getUserName - - if (principal == null) { - val errorMessage = "Required hive metastore principal is not configured!" - logError(errorMessage) - throw new IllegalArgumentException(errorMessage) - } + if (UserGroupInformation.isSecurityEnabled) { + val mirror = universe.runtimeMirror(getClass.getClassLoader) - val tokenStr = hc.getDelegationToken(username,principal) - val hive2Token = new Token[DelegationTokenIdentifier]() - hive2Token.decodeFromUrlString(tokenStr) - credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token) - logDebug("Added the Hive Server 2 token to conf.") - org.apache.hadoop.hive.ql.metadata.Hive.closeCurrent + try { + val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive") + val hive = hiveClass.getMethod("get").invoke(null) + + val hiveConf = hiveClass.getMethod("getConf").invoke(hive) + val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") + + val hiveConfGet = (param:String) => Option(hiveConfClass + .getMethod("get",classOf[java.lang.String]) + .invoke(hiveConf, param)) + + val metastore_uri = hiveConfGet("hive.metastore.uris") + + // Check for local metastore + if(metastore_uri != None && metastore_uri.get.toString.size > 0) { + val metastore_kerberos_principal_conf_var = mirror.classLoader + .loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars") + .getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString + + val principal = hiveConfGet(metastore_kerberos_principal_conf_var) + + val username = Option(UserGroupInformation.getCurrentUser().getUserName) + if(principal != None && username != None){ + val tokenStr = hiveClass.getMethod("getDelegationToken", + classOf[java.lang.String], classOf[java.lang.String]) + .invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String] + + val hive2Token = new Token[DelegationTokenIdentifier]() + hive2Token.decodeFromUrlString(tokenStr) + credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token) + logDebug("Added hive.Server2.delegation.token to conf.") + hiveClass.getMethod("closeCurrent").invoke(null) + } else { + logError("Username or principal == NULL") + logError(s"""username=${username.getOrElse("(NULL)")}""") + logError(s"""principal=${principal.getOrElse("(NULL)")}""") + throw new IllegalArgumentException("username and/or principal is equal to null!") + } + } else { + logDebug("HiveMetaStore configured in localmode") + } + } catch { + case e:java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return } + case e:java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return } + case e:Exception => { logError("Unexpected Exception " + e) + throw new RuntimeException("Unexpected exception",e) + } + } } } From 3e9ac16a8e2e0bdd11f850428115bdc891874b1e Mon Sep 17 00:00:00 2001 From: Doug Balog Date: Fri, 10 Apr 2015 15:53:24 -0400 Subject: [PATCH 3/3] [SPARK-6207] Fixes minor code spacing issues. --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d6137b21f2e78..741814634fbfa 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -939,13 +939,13 @@ object Client extends Logging { val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") val hiveConfGet = (param:String) => Option(hiveConfClass - .getMethod("get",classOf[java.lang.String]) + .getMethod("get", classOf[java.lang.String]) .invoke(hiveConf, param)) val metastore_uri = hiveConfGet("hive.metastore.uris") // Check for local metastore - if(metastore_uri != None && metastore_uri.get.toString.size > 0) { + if (metastore_uri != None && metastore_uri.get.toString.size > 0) { val metastore_kerberos_principal_conf_var = mirror.classLoader .loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars") .getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString @@ -953,7 +953,7 @@ object Client extends Logging { val principal = hiveConfGet(metastore_kerberos_principal_conf_var) val username = Option(UserGroupInformation.getCurrentUser().getUserName) - if(principal != None && username != None){ + if (principal != None && username != None) { val tokenStr = hiveClass.getMethod("getDelegationToken", classOf[java.lang.String], classOf[java.lang.String]) .invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String] @@ -976,7 +976,7 @@ object Client extends Logging { case e:java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return } case e:java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return } case e:Exception => { logError("Unexpected Exception " + e) - throw new RuntimeException("Unexpected exception",e) + throw new RuntimeException("Unexpected exception", e) } } }