Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
set this configuration to "hdfs:///some/path".
</td>
</tr>
<tr>
<td><code>spark.yarn.access.namenodes</code></td>
<td>(none)</td>
<td>
A list of secure HDFS namenodes your Spark application is going to access. For example, `spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032`. The Spark application must have acess to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

BTW, I know it's the current style of the file, but these long lines could use some breaks.

</td>
</tr>
</table>

# Launching Spark on YARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
Expand Down Expand Up @@ -191,23 +191,11 @@ trait ClientBase extends Logging {
// Upload Spark and the application JAR to the remote file system if necessary. Add them as
// local resources to the application master.
val fs = FileSystem.get(conf)

val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer"
logError(errorMessage)
throw new SparkException(errorMessage)
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort

if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
dstFs.addDelegationTokens(delegTokenRenewer, credentials)
}
val nns = ClientBase.getNameNodesToAccess(sparkConf) + dst
ClientBase.obtainTokensForNamenodes(nns, conf, credentials)

val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))

Expand Down Expand Up @@ -613,4 +601,40 @@ object ClientBase extends Logging {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
File.pathSeparator)

/**
* Get the list of namenodes the user may access.
*/
private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
sparkConf.get("spark.yarn.access.namenodes", "").split(",").map(_.trim()).filter(!_.isEmpty)
.map(new Path(_)).toSet
}

private[yarn] def getTokenRenewer(conf: Configuration): String = {
val delegTokenRenewer = Master.getMasterPrincipal(conf)
logDebug("delegation token renewer is: " + delegTokenRenewer)
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer"
logError(errorMessage)
throw new SparkException(errorMessage)
}
delegTokenRenewer
}

/**
* Obtains tokens for the namenodes passed in and adds them to the credentials.
*/
private[yarn] def obtainTokensForNamenodes(paths: Set[Path], conf: Configuration,
creds: Credentials) {
if (UserGroupInformation.isSecurityEnabled()) {
val delegTokenRenewer = getTokenRenewer(conf)

paths.foreach {
dst =>
val dstFs = dst.getFileSystem(conf)
logDebug("getting token for namenode: " + dst)
dstFs.addDelegationTokens(delegTokenRenewer, creds)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Matchers._
import org.mockito.Mockito._


import org.scalatest.FunSuite
import org.scalatest.Matchers

import scala.collection.JavaConversions._
import scala.collection.mutable.{ HashMap => MutableHashMap }
import scala.util.Try

import org.apache.spark.SparkConf
import org.apache.spark.{SparkException, SparkConf}
import org.apache.spark.util.Utils

class ClientBaseSuite extends FunSuite with Matchers {
Expand Down Expand Up @@ -138,6 +140,57 @@ class ClientBaseSuite extends FunSuite with Matchers {
}
}

test("check access nns empty") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "")
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set())
}

test("check access nns unset") {
val sparkConf = new SparkConf()
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set())
}

test("check access nns") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set(new Path("hdfs://nn1:8032")))
}

test("check access nns space") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set(new Path("hdfs://nn1:8032")))
}

test("check access two nns") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
val nns = ClientBase.getNameNodesToAccess(sparkConf)
nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
}

test("check token renewer") {
val hadoopConf = new Configuration()
hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
val renewer = ClientBase.getTokenRenewer(hadoopConf)
renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
}

test("check token renewer default") {
val hadoopConf = new Configuration()
val caught =
intercept[SparkException] {
ClientBase.getTokenRenewer(hadoopConf)
}
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
}

object Fixtures {

val knownDefYarnAppCP: Seq[String] =
Expand Down