From 0540d38fabe08cb63aecd9df953bed5cbe3bfa62 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 5 Feb 2015 13:49:47 -0800 Subject: [PATCH 1/5] [SPARK-5493] [core] Add option to impersonate user. Hadoop has a feature that allows users to impersonate other users when submitting applications or talking to HDFS, for example. These impersonated users are referred generally as "proxy users". Services such as Oozie or Hive use this feature to run applications as the requesting user. This change makes SparkSubmit accept a new command line option to run the application as a proxy user. It also fixes the plumbing of the user name through the UI (and a couple of other places) to refer to the correct user running the application, which can be different that `sys.props("user.name")` even without proxies (e.g. when using kerberos). --- bin/utils.sh | 3 ++- bin/windows-utils.cmd | 1 + .../org/apache/spark/SecurityManager.scala | 3 ++- .../scala/org/apache/spark/SparkContext.scala | 16 +++++---------- .../apache/spark/deploy/SparkHadoopUtil.scala | 19 +++++++----------- .../org/apache/spark/deploy/SparkSubmit.scala | 20 ++++++++++++++----- .../spark/deploy/SparkSubmitArguments.scala | 7 +++++++ .../scala/org/apache/spark/util/Utils.scala | 11 ++++++++++ 8 files changed, 50 insertions(+), 30 deletions(-) diff --git a/bin/utils.sh b/bin/utils.sh index 224120008201..88eb84d86b79 100755 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -35,7 +35,8 @@ function gatherSparkSubmitOpts() { --master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \ --conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \ --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \ - --total-executor-cores | --executor-cores | --queue | --num-executors | --archives) + --total-executor-cores | --executor-cores | --queue | --num-executors | --archives | + --proxy-user) if [[ $# -lt 2 ]]; then "$SUBMIT_USAGE_FUNCTION" exit 1; diff --git a/bin/windows-utils.cmd b/bin/windows-utils.cmd index 567b8733f7f7..0cf9e87ca554 100644 --- a/bin/windows-utils.cmd +++ b/bin/windows-utils.cmd @@ -33,6 +33,7 @@ SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<-- SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>" SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>" SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>" +SET opts="%opts:~1,-1% \<--proxy-user\>" echo %1 | findstr %opts% >nul if %ERRORLEVEL% equ 0 ( diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 88d35a4bacc6..3653f724ba19 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.io.Text import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.network.sasl.SecretKeyHolder +import org.apache.spark.util.Utils /** * Spark class responsible for security. @@ -203,7 +204,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) // always add the current user and SPARK_USER to the viewAcls private val defaultAclUsers = Set[String](System.getProperty("user.name", ""), - Option(System.getenv("SPARK_USER")).getOrElse("")).filter(!_.isEmpty) + Utils.getCurrentUserName()) setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", "")) setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", "")) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 24490fddc5c6..507996e3933f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -191,7 +191,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // log out Spark Version in Spark driver log logInfo(s"Running Spark version $SPARK_VERSION") - + private[spark] val conf = config.clone() conf.validateSettings() @@ -330,11 +330,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli executorEnvs ++= conf.getExecutorEnv // Set SPARK_USER for user who is running SparkContext. - val sparkUser = Option { - Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name")) - }.getOrElse { - SparkContext.SPARK_UNKNOWN_USER - } + val sparkUser = Utils.getCurrentUserName() executorEnvs("SPARK_USER") = sparkUser // Create and start the scheduler @@ -818,7 +814,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { assertNotStopped() - // The call to new NewHadoopJob automatically adds security credentials to conf, + // The call to new NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves val job = new NewHadoopJob(conf) NewFileInputFormat.addInputPath(job, new Path(path)) @@ -1590,8 +1586,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli @deprecated("use defaultMinPartitions", "1.0.0") def defaultMinSplits: Int = math.min(defaultParallelism, 2) - /** - * Default min number of partitions for Hadoop RDDs when not given by user + /** + * Default min number of partitions for Hadoop RDDs when not given by user * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2. * The reasons for this are discussed in https://github.com/mesos/spark/pull/718 */ @@ -1808,8 +1804,6 @@ object SparkContext extends Logging { private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel" - private[spark] val SPARK_UNKNOWN_USER = "" - private[spark] val DRIVER_IDENTIFIER = "" // The following deprecated objects have already been copied to `object AccumulatorParam` to diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index d68854214ef0..37bcefae001e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -52,18 +52,13 @@ class SparkHadoopUtil extends Logging { * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems */ def runAsSparkUser(func: () => Unit) { - val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) - if (user != SparkContext.SPARK_UNKNOWN_USER) { - logDebug("running as user: " + user) - val ugi = UserGroupInformation.createRemoteUser(user) - transferCredentials(UserGroupInformation.getCurrentUser(), ugi) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) - } else { - logDebug("running as SPARK_UNKNOWN_USER") - func() - } + val user = Utils.getCurrentUserName() + logDebug("running as user: " + user) + val ugi = UserGroupInformation.createRemoteUser(user) + transferCredentials(UserGroupInformation.getCurrentUser(), ugi) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + }) } def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 8bbfcd2997dc..834ac7242230 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,11 +20,12 @@ package org.apache.spark.deploy import java.io.{File, PrintStream} import java.lang.reflect.{Modifier, InvocationTargetException} import java.net.URL +import java.security.PrivilegedExceptionAction import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import org.apache.hadoop.fs.Path - +import org.apache.hadoop.security.UserGroupInformation import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor} @@ -85,7 +86,7 @@ object SparkSubmit { printStream.println(appArgs) } val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) - launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose) + launch(childArgs, classpath, sysProps, mainClass, appArgs) } /** @@ -380,8 +381,8 @@ object SparkSubmit { childClasspath: ArrayBuffer[String], sysProps: Map[String, String], childMainClass: String, - verbose: Boolean = false) { - if (verbose) { + appArgs: SparkSubmitArguments) { + if (appArgs.verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") printStream.println(s"System properties:\n${sysProps.mkString("\n")}") @@ -424,8 +425,17 @@ object SparkSubmit { if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } + try { - mainMethod.invoke(null, childArgs.toArray) + if (appArgs.proxyUser != null) { + val proxyUser = UserGroupInformation.createProxyUser(appArgs.proxyUser, + UserGroupInformation.getCurrentUser()) + proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = mainMethod.invoke(null, childArgs.toArray) + }) + } else { + mainMethod.invoke(null, childArgs.toArray) + } } catch { case e: InvocationTargetException => e.getCause match { case cause: Throwable => throw cause diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 5cadc534f4ba..dca0539d9013 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -57,6 +57,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var isPython: Boolean = false var pyFiles: String = null val sparkProperties: HashMap[String, String] = new HashMap[String, String]() + var proxyUser: String = null /** Default properties present in the currently defined defaults file. */ lazy val defaultSparkProperties: HashMap[String, String] = { @@ -339,6 +340,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St } parse(tail) + case ("--proxy-user") :: value :: tail => + proxyUser = value + parse(tail) + case ("--help" | "-h") :: tail => printUsageAndExit(0) @@ -407,6 +412,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). | + | --proxy-user User to impersonate when submitting the application. + | | --help, -h Show this help message and exit | --verbose, -v Print additional debug output | diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 72d15e65bcde..bf0d16f499f8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -38,6 +38,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ @@ -1986,6 +1987,16 @@ private[spark] object Utils extends Logging { throw new SparkException("Invalid master URL: " + sparkUrl, e) } } + + /** + * Returns the current user name. This is the currently logged in user, unless that's been + * overridden by the `SPARK_USER` environment variable. + */ + def getCurrentUserName(): String = { + Option(System.getenv("SPARK_USER")) + .getOrElse(UserGroupInformation.getCurrentUser().getUserName()) + } + } /** From 8af06ff3154807807332c3619ebbff32a6349977 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 6 Feb 2015 17:26:12 -0800 Subject: [PATCH 2/5] Fix usage string. --- .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 5bf422506b96..fa38070c6fcf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -481,7 +481,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). | - | --proxy-user User to impersonate when submitting the application. + | --proxy-user NAME User to impersonate when submitting the application. | | --help, -h Show this help message and exit | --verbose, -v Print additional debug output From 4840de9bb5d1502e2f968487011aac9b04e92306 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 10 Feb 2015 13:38:37 -0800 Subject: [PATCH 3/5] Review feedback. Needed some extra code to handle some special exceptions that the new code may hit, since they seem to confuse the JVM. --- bin/utils.sh | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 66 +++++++++++++------ 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/bin/utils.sh b/bin/utils.sh index 88eb84d86b79..748dbe345a74 100755 --- a/bin/utils.sh +++ b/bin/utils.sh @@ -35,7 +35,7 @@ function gatherSparkSubmitOpts() { --master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \ --conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \ --driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \ - --total-executor-cores | --executor-cores | --queue | --num-executors | --archives | + --total-executor-cores | --executor-cores | --queue | --num-executors | --archives | \ --proxy-user) if [[ $# -lt 2 ]]; then "$SUBMIT_USAGE_FUNCTION" diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fd653a8f11e3..94125c6cde5b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy import java.io.{File, PrintStream} -import java.lang.reflect.{InvocationTargetException, Modifier} +import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL import java.security.PrivilegedExceptionAction @@ -81,7 +81,7 @@ object SparkSubmit { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 // Exposed for testing - private[spark] var exitFn: () => Unit = () => System.exit(-1) + private[spark] var exitFn: () => Unit = () => System.exit(1) private[spark] var printStream: PrintStream = System.err private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) private[spark] def printErrorAndExit(str: String) = { @@ -128,6 +128,34 @@ object SparkSubmit { */ private[spark] def submit(args: SparkSubmitArguments): Unit = { val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) + + def doRunMain(): Unit = { + if (args.proxyUser != null) { + val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, + UserGroupInformation.getCurrentUser()) + try { + proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) + } + }) + } catch { + case e: Exception => + // Hadoop's AuthorizationException suppresses the exception's stack trace, which + // confuses the JVM when propagating it. Instead, detect exceptions with empty + // stack traces here, and treat them differently. + if (e.getStackTrace().length == 0) { + printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") + exitFn() + } else { + throw e + } + } + } else { + runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) + } + } + // In standalone cluster mode, there are two submission gateways: // (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 @@ -136,7 +164,7 @@ object SparkSubmit { if (args.isStandaloneCluster && args.useRest) { try { printStream.println("Running Spark using the REST application submission protocol.") - runMain(childArgs, childClasspath, sysProps, childMainClass, args) + doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => @@ -147,7 +175,7 @@ object SparkSubmit { } // In all other modes, just run the main class as prepared } else { - runMain(childArgs, childClasspath, sysProps, childMainClass, args) + doRunMain() } } @@ -454,13 +482,14 @@ object SparkSubmit { * Note that this main class will not be the one provided by the user if we're * running cluster deploy mode or python applications. */ + @throws[Exception] private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, - appArgs: SparkSubmitArguments) { - if (appArgs.verbose) { + verbose: Boolean): Unit = { + if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") printStream.println(s"System properties:\n${sysProps.mkString("\n")}") @@ -510,21 +539,20 @@ object SparkSubmit { throw new IllegalStateException("The main method in the given main class must be static") } + def findCause(t: Throwable): Throwable = t match { + case e: UndeclaredThrowableException => + if (e.getCause() != null) findCause(e.getCause()) else e + case e: InvocationTargetException => + if (e.getCause() != null) findCause(e.getCause()) else e + case e: Throwable => + e + } + try { - if (appArgs.proxyUser != null) { - val proxyUser = UserGroupInformation.createProxyUser(appArgs.proxyUser, - UserGroupInformation.getCurrentUser()) - proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { - override def run(): Unit = mainMethod.invoke(null, childArgs.toArray) - }) - } else { - mainMethod.invoke(null, childArgs.toArray) - } + mainMethod.invoke(null, childArgs.toArray) } catch { - case e: InvocationTargetException => e.getCause match { - case cause: Throwable => throw cause - case null => throw e - } + case t: Throwable => + throw findCause(t) } } From 05bfc08192b1af7e56170e4a9a80b3bfc5456459 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 10 Feb 2015 13:40:51 -0800 Subject: [PATCH 4/5] Remove unneeded annotation. --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 94125c6cde5b..d2be253f57a4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -482,7 +482,6 @@ object SparkSubmit { * Note that this main class will not be the one provided by the user if we're * running cluster deploy mode or python applications. */ - @throws[Exception] private def runMain( childArgs: Seq[String], childClasspath: Seq[String], From df82427a3bae88957744b5d5aeca06a8d6d36d1f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 10 Feb 2015 13:54:38 -0800 Subject: [PATCH 5/5] Clarify the reason for the special exception handling. --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d2be253f57a4..581ae6947fd4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -142,8 +142,8 @@ object SparkSubmit { } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which - // confuses the JVM when propagating it. Instead, detect exceptions with empty - // stack traces here, and treat them differently. + // makes the message printed to the output by the JVM not very helpful. Instead, + // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") exitFn()