From 515613d759f01a81bc39a1bf230094ae6f012d6b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 1 Aug 2014 15:12:17 -0700 Subject: [PATCH 1/4] [SPARK-2718] [yarn] Handle quotes and other characters in user args. Due to the way Yarn runs things through bash, normal quoting doesn't work as expected. This change applies the necessary voodoo to the user args to avoid issues with bash and special characters. The change also uncovered an issue with the event logger app name sanitizing code; it wasn't cleaning up all "bad" characters, so sometimes it would fail to create the log dirs. I just added some more bad character replacements. --- .../scheduler/EventLoggingListener.scala | 3 +- .../apache/spark/deploy/yarn/ClientBase.scala | 9 +-- .../deploy/yarn/ExecutorRunnableUtil.scala | 4 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 21 ++++++ .../cluster/YarnClientSchedulerBackend.scala | 4 +- .../yarn/YarnSparkHadoopUtilSuite.scala | 64 +++++++++++++++++++ 6 files changed, 96 insertions(+), 9 deletions(-) create mode 100644 yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 406147f167bf3..d5e252d247b02 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -54,7 +54,8 @@ private[spark] class EventLoggingListener( private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") - private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis + private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_") + .toLowerCase + "-" + System.currentTimeMillis val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index b7e8636e02eb2..08239897a1f4f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -299,11 +299,11 @@ trait ClientBase extends Logging { } def userArgsToString(clientArgs: ClientArguments): String = { - val prefix = " --args " + val prefix = " --arg " val args = clientArgs.userArgs val retval = new StringBuilder() for (arg <- args) { - retval.append(prefix).append(" '").append(arg).append("' ") + retval.append(prefix).append(" ").append(YarnSparkHadoopUtil.escapeForShell(arg)) } retval.toString } @@ -385,7 +385,7 @@ trait ClientBase extends Logging { // TODO: it might be nicer to pass these as an internal environment variable rather than // as Java options, due to complications with string parsing of nested quotes. for ((k, v) <- sparkConf.getAll) { - javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" + javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } if (args.amClass == classOf[ApplicationMaster].getName) { @@ -399,7 +399,8 @@ trait ClientBase extends Logging { // Command for the ApplicationMaster val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ javaOpts ++ - Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar, + Seq(args.amClass, "--class", YarnSparkHadoopUtil.escapeForShell(args.userClass), + "--jar ", YarnSparkHadoopUtil.escapeForShell(args.userJar), userArgsToString(args), "--executor-memory", args.executorMemory.toString, "--executor-cores", args.executorCores.toString, diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index 4ba7133a959ed..6e9555dfe7209 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -68,10 +68,10 @@ trait ExecutorRunnableUtil extends Logging { // authentication settings. sparkConf.getAll. filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. - foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } + foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } sparkConf.getAkkaConf. - foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" } + foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index e98308cdbd74e..c0900f4df284d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -148,4 +148,25 @@ object YarnSparkHadoopUtil { } } + /** + * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands + * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The + * argument is enclosed in single quotes and some key characters are escaped. + * + * @param arg A single argument. + * @return Argument quoted for execution via Yarn's generated shell script. + */ + def escapeForShell(arg: String): String = { + val escaped = new StringBuilder("'") + for (i <- 0 to arg.length() - 1) { + arg.charAt(i) match { + case '$' => escaped.append("\\$") + case '"' => escaped.append("\\\"") + case '\'' => escaped.append("'\\\"'\\\"'") + case c => escaped.append(c) + } + } + escaped.append("'").toString() + } + } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index f8fb96b312f23..877ccab181dbf 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -43,9 +43,9 @@ private[spark] class YarnClientSchedulerBackend( private[spark] def addArg(optionName: String, envVar: String, sysProp: String, arrayBuf: ArrayBuffer[String]) { if (System.getenv(envVar) != null) { - arrayBuf += (optionName, System.getenv(envVar)) + arrayBuf += (optionName, YarnSparkHadoopUtil.escapeForShell(System.getenv(envVar))) } else if (sc.getConf.contains(sysProp)) { - arrayBuf += (optionName, sc.getConf.get(sysProp)) + arrayBuf += (optionName, YarnSparkHadoopUtil.escapeForShell(sc.getConf.get(sysProp))) } } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala new file mode 100644 index 0000000000000..e4773fb9da17f --- /dev/null +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.{File, IOException} + +import com.google.common.io.{ByteStreams, Files} +import org.scalatest.{FunSuite, Matchers} + +import org.apache.spark.Logging + +class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { + + val hasBash = + try { + val exitCode = Runtime.getRuntime().exec(Array("bash", "--version")).waitFor() + exitCode == 0 + } catch { + case e: IOException => + false + } + + if (!hasBash) { + logInfo("Cannot execute bash, skipping all tests.") + } + + def bashTest(name: String)(fn: => Unit) = + if (hasBash) test(name)(fn) else ignore(name)(fn) + + bashTest("shell script escaping") { + val scriptFile = File.createTempFile("script.", ".sh") + val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5") + try { + val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ") + Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile) + scriptFile.setExecutable(true) + + val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath())) + val out = new String(ByteStreams.toByteArray(proc.getInputStream())).trim() + val err = new String(ByteStreams.toByteArray(proc.getErrorStream())) + val exitCode = proc.waitFor() + exitCode should be (0) + out should be (args.mkString(" ")) + } finally { + scriptFile.delete() + } + } + +} From 55571d4641414f15f3d08dff5484b9c76bd1f8ba Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 1 Aug 2014 16:43:07 -0700 Subject: [PATCH 2/4] Unbreak yarn-client. Mental note: always test both. --- .../yarn/ApplicationMasterArguments.scala | 6 +++--- .../deploy/yarn/YarnSparkHadoopUtil.scala | 20 +++++++++++-------- .../cluster/YarnClientSchedulerBackend.scala | 4 ++-- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 4c383ab574abe..424b0fb0936f2 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -29,7 +29,7 @@ class ApplicationMasterArguments(val args: Array[String]) { var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS parseArgs(args.toList) - + private def parseArgs(inputArgs: List[String]): Unit = { val userArgsBuffer = new ArrayBuffer[String]() @@ -47,7 +47,7 @@ class ApplicationMasterArguments(val args: Array[String]) { userClass = value args = tail - case ("--args") :: value :: tail => + case ("--args" | "--arg") :: value :: tail => userArgsBuffer += value args = tail @@ -75,7 +75,7 @@ class ApplicationMasterArguments(val args: Array[String]) { userArgs = userArgsBuffer.readOnly } - + def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { if (unknownParam != null) { System.err.println("Unknown/unsupported param " + unknownParam) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index c0900f4df284d..5692c5f6367e7 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -157,16 +157,20 @@ object YarnSparkHadoopUtil { * @return Argument quoted for execution via Yarn's generated shell script. */ def escapeForShell(arg: String): String = { - val escaped = new StringBuilder("'") - for (i <- 0 to arg.length() - 1) { - arg.charAt(i) match { - case '$' => escaped.append("\\$") - case '"' => escaped.append("\\\"") - case '\'' => escaped.append("'\\\"'\\\"'") - case c => escaped.append(c) + if (arg != null) { + val escaped = new StringBuilder("'") + for (i <- 0 to arg.length() - 1) { + arg.charAt(i) match { + case '$' => escaped.append("\\$") + case '"' => escaped.append("\\\"") + case '\'' => escaped.append("'\\\"'\\\"'") + case c => escaped.append(c) + } } + escaped.append("'").toString() + } else { + arg } - escaped.append("'").toString() } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 877ccab181dbf..f8fb96b312f23 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -43,9 +43,9 @@ private[spark] class YarnClientSchedulerBackend( private[spark] def addArg(optionName: String, envVar: String, sysProp: String, arrayBuf: ArrayBuffer[String]) { if (System.getenv(envVar) != null) { - arrayBuf += (optionName, YarnSparkHadoopUtil.escapeForShell(System.getenv(envVar))) + arrayBuf += (optionName, System.getenv(envVar)) } else if (sc.getConf.contains(sysProp)) { - arrayBuf += (optionName, YarnSparkHadoopUtil.escapeForShell(sc.getConf.get(sysProp))) + arrayBuf += (optionName, sc.getConf.get(sysProp)) } } From c1a257a553f8156dd2eefb2826f685634eeefa65 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 4 Aug 2014 16:03:50 -0700 Subject: [PATCH 3/4] Add test for backslashes. --- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index e4773fb9da17f..07ac4f56f7aaf 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -44,7 +44,7 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { bashTest("shell script escaping") { val scriptFile = File.createTempFile("script.", ".sh") - val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5") + val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6") try { val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ") Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile) From cc84b89f1f35462d7f8f06826815214e079499b8 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 6 Aug 2014 16:02:54 -0700 Subject: [PATCH 4/4] Review feedback. --- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 5692c5f6367e7..10aef5eb2486f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -163,7 +163,7 @@ object YarnSparkHadoopUtil { arg.charAt(i) match { case '$' => escaped.append("\\$") case '"' => escaped.append("\\\"") - case '\'' => escaped.append("'\\\"'\\\"'") + case '\'' => escaped.append("'\\''") case c => escaped.append(c) } } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 07ac4f56f7aaf..7650bd4396c12 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -36,7 +36,7 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { } if (!hasBash) { - logInfo("Cannot execute bash, skipping all tests.") + logWarning("Cannot execute bash, skipping bash tests.") } def bashTest(name: String)(fn: => Unit) =