Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
import scala.reflect.runtime.universe
import scala.util.{Try, Success, Failure}

import com.google.common.base.Objects

import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
Expand Down Expand Up @@ -219,6 +223,7 @@ private[spark] class Client(
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val nns = getNameNodesToAccess(sparkConf) + dst
obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf, credentials)

val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
fs.getDefaultReplication(dst)).toShort
Expand Down Expand Up @@ -919,6 +924,64 @@ object Client extends Logging {
}
}

/**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) {
if (UserGroupInformation.isSecurityEnabled) {
val mirror = universe.runtimeMirror(getClass.getClassLoader)

try {
val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
val hive = hiveClass.getMethod("get").invoke(null)

val hiveConf = hiveClass.getMethod("getConf").invoke(hive)
val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")

val hiveConfGet = (param:String) => Option(hiveConfClass
.getMethod("get", classOf[java.lang.String])
.invoke(hiveConf, param))

val metastore_uri = hiveConfGet("hive.metastore.uris")

// Check for local metastore
if (metastore_uri != None && metastore_uri.get.toString.size > 0) {
val metastore_kerberos_principal_conf_var = mirror.classLoader
.loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars")
.getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString

val principal = hiveConfGet(metastore_kerberos_principal_conf_var)

val username = Option(UserGroupInformation.getCurrentUser().getUserName)
if (principal != None && username != None) {
val tokenStr = hiveClass.getMethod("getDelegationToken",
classOf[java.lang.String], classOf[java.lang.String])
.invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String]

val hive2Token = new Token[DelegationTokenIdentifier]()
hive2Token.decodeFromUrlString(tokenStr)
credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token)
logDebug("Added hive.Server2.delegation.token to conf.")
hiveClass.getMethod("closeCurrent").invoke(null)
} else {
logError("Username or principal == NULL")
logError(s"""username=${username.getOrElse("(NULL)")}""")
logError(s"""principal=${principal.getOrElse("(NULL)")}""")
throw new IllegalArgumentException("username and/or principal is equal to null!")
}
} else {
logDebug("HiveMetaStore configured in localmode")
}
} catch {
case e:java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
case e:java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
case e:Exception => { logError("Unexpected Exception " + e)
throw new RuntimeException("Unexpected exception", e)
}
}
}
}

/**
* Return whether the two file systems are the same.
*/
Expand Down