From 413fa25dde845146153a58793ca6b3ec3a820ea8 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Sun, 12 Apr 2015 16:02:43 +0800 Subject: [PATCH 01/11] Pass PYTHONPATH to executor --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 +++++ .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c1effd3c8a718..cdc3ad51492e7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -371,6 +371,11 @@ private[spark] class Client( env(ENV_DIST_CLASSPATH) = dcp } + sys.env.get("PYTHONPATH") match { + case Some(pythonPath) => env("PYTHONPATH") = pythonPath + case None => // do nothing + } + env } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 1ce10d906ab23..3e23cb8e616f1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -299,6 +299,12 @@ class ExecutorRunnable( } System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } + + sys.env.get("PYTHONPATH") match { + case Some(pythonPath) => env("PYTHONPATH") = pythonPath + case None => // do nothing + } + env } } From 51ebb262b0a0ff30603b3ce650c474870a152582 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Thu, 16 Apr 2015 15:54:03 +0800 Subject: [PATCH 02/11] Ship pyspark archives and add it to PYTHONPATH --- .../org/apache/spark/api/python/PythonUtils.scala | 4 ++++ .../scala/org/apache/spark/deploy/yarn/Client.scala | 5 ----- .../apache/spark/deploy/yarn/ExecutorRunnable.scala | 13 ++++++------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index acbaba6791850..1c92af45f869a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -35,6 +35,10 @@ private[spark] object PythonUtils { pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator) } pythonPath ++= SparkContext.jarOfObject(this) + sys.env.get("PYSPARK_ARCHIVES_PATH") match { + case Some(path) => pythonPath += path + case None => // do nothing + } pythonPath.mkString(File.pathSeparator) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index cdc3ad51492e7..c1effd3c8a718 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -371,11 +371,6 @@ private[spark] class Client( env(ENV_DIST_CLASSPATH) = dcp } - sys.env.get("PYTHONPATH") match { - case Some(pythonPath) => env("PYTHONPATH") = pythonPath - case None => // do nothing - } - env } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 3e23cb8e616f1..6c1101d8ac42e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -75,7 +75,12 @@ class ExecutorRunnable( val localResources = prepareLocalResources ctx.setLocalResources(localResources) - + // From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are + // package by JDK 1.7+, so we ship PySpark archives to executors by Yarn with --py-files, and + // add this path to PYTHONPATH. + for ((k, v) <- localResources if k.contains("spark-pyspark")) { + env("PYSPARK_ARCHIVES_PATH") = k + } ctx.setEnvironment(env) val credentials = UserGroupInformation.getCurrentUser().getCredentials() @@ -299,12 +304,6 @@ class ExecutorRunnable( } System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } - - sys.env.get("PYTHONPATH") match { - case Some(pythonPath) => env("PYTHONPATH") = pythonPath - case None => // do nothing - } - env } } From 309679acec332c7315357a3430795f6b2880019e Mon Sep 17 00:00:00 2001 From: linweizhong Date: Thu, 16 Apr 2015 18:40:33 +0800 Subject: [PATCH 03/11] Update doc --- docs/submitting-applications.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 3ecbf2308cd44..46204634a74d4 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -22,6 +22,9 @@ For Python, you can use the `--py-files` argument of `spark-submit` to add `.py` files to be distributed with your application. If you depend on multiple Python files we recommend packaging them into a `.zip` or `.egg`. +As Python can not read files from assembly jar which package by JDK1.7+, so packaging pyspark into a +`.zip`(the name contains "spark-pyspark") and use `--py-files` argument of `spark-submit` to distribute it. + # Launching Applications with spark-submit Once a user application is bundled, it can be launched using the `bin/spark-submit` script. From 052e288877b195341d3898f2b2aa6b23c2d5b680 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Thu, 16 Apr 2015 18:43:10 +0800 Subject: [PATCH 04/11] Update --- docs/submitting-applications.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 46204634a74d4..845c70b8b478c 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -22,7 +22,7 @@ For Python, you can use the `--py-files` argument of `spark-submit` to add `.py` files to be distributed with your application. If you depend on multiple Python files we recommend packaging them into a `.zip` or `.egg`. -As Python can not read files from assembly jar which package by JDK1.7+, so packaging pyspark into a +As Python can not read files from assembly jar which packaged by JDK1.7+, so packaging pyspark into a `.zip`(the name contains "spark-pyspark") and use `--py-files` argument of `spark-submit` to distribute it. # Launching Applications with spark-submit From 3a0ec77bc092dfe3dd9c467f1979d946c7f8cca3 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Thu, 16 Apr 2015 21:03:27 +0800 Subject: [PATCH 05/11] Update --- docs/submitting-applications.md | 2 +- .../scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 845c70b8b478c..73afbe06df606 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -23,7 +23,7 @@ files to be distributed with your application. If you depend on multiple Python packaging them into a `.zip` or `.egg`. As Python can not read files from assembly jar which packaged by JDK1.7+, so packaging pyspark into a -`.zip`(the name contains "spark-pyspark") and use `--py-files` argument of `spark-submit` to distribute it. +`.zip`(the name contains "pyspark") and use `--py-files` argument of `spark-submit` to distribute it. # Launching Applications with spark-submit diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 6c1101d8ac42e..a8341991f64ad 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -78,8 +78,8 @@ class ExecutorRunnable( // From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are // package by JDK 1.7+, so we ship PySpark archives to executors by Yarn with --py-files, and // add this path to PYTHONPATH. - for ((k, v) <- localResources if k.contains("spark-pyspark")) { - env("PYSPARK_ARCHIVES_PATH") = k + for ((resPath, res) <- localResources if resPath.contains("pyspark")) { + env("PYSPARK_ARCHIVES_PATH") = resPath } ctx.setEnvironment(env) From 547fd957ba224c86cf828890562b2eafde2b8ecb Mon Sep 17 00:00:00 2001 From: linweizhong Date: Fri, 17 Apr 2015 14:10:18 +0800 Subject: [PATCH 06/11] Update pyspark.zip auto --- .../apache/spark/api/python/PythonUtils.scala | 4 --- .../api/python/PythonWorkerFactory.scala | 2 +- .../apache/spark/deploy/PythonRunner.scala | 1 + docs/submitting-applications.md | 3 --- .../org/apache/spark/deploy/yarn/Client.scala | 26 ++++++++++++++++++- .../spark/deploy/yarn/ExecutorRunnable.scala | 13 +++++----- 6 files changed, 34 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 1c92af45f869a..acbaba6791850 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -35,10 +35,6 @@ private[spark] object PythonUtils { pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator) } pythonPath ++= SparkContext.jarOfObject(this) - sys.env.get("PYSPARK_ARCHIVES_PATH") match { - case Some(path) => pythonPath += path - case None => // do nothing - } pythonPath.mkString(File.pathSeparator) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index e314408c067e9..3f6e0dd6050d2 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -49,7 +49,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val pythonPath = PythonUtils.mergePythonPaths( PythonUtils.sparkPythonPath, - envVars.getOrElse("PYTHONPATH", ""), + envVars.getOrElse("PYTHONPATH", sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "")), sys.env.getOrElse("PYTHONPATH", "")) def create(): Socket = { diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 53e18c4bcec23..569abcfff68f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -52,6 +52,7 @@ object PythonRunner { pathElements ++= formattedPyFiles pathElements += PythonUtils.sparkPythonPath pathElements += sys.env.getOrElse("PYTHONPATH", "") + pathElements += sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "") val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) // Launch Python process diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 73afbe06df606..3ecbf2308cd44 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -22,9 +22,6 @@ For Python, you can use the `--py-files` argument of `spark-submit` to add `.py` files to be distributed with your application. If you depend on multiple Python files we recommend packaging them into a `.zip` or `.egg`. -As Python can not read files from assembly jar which packaged by JDK1.7+, so packaging pyspark into a -`.zip`(the name contains "pyspark") and use `--py-files` argument of `spark-submit` to distribute it. - # Launching Applications with spark-submit Once a user application is bundled, it can be launched using the `bin/spark-submit` script. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c1effd3c8a718..b1ee4096f4b6c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -247,6 +247,7 @@ private[spark] class Client( List( (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR), (APP_JAR, args.userJar, CONF_SPARK_USER_JAR), + (PYSPARK_ARCHIVES, pysparkArchives(sparkConf), CONF_PYSPARK_ARCHIVES), ("log4j.properties", oldLog4jConf.orNull, null) ).foreach { case (destName, _localPath, confKey) => val localPath: String = if (_localPath != null) _localPath.trim() else "" @@ -386,6 +387,12 @@ private[spark] class Client( val appStagingDir = getAppStagingDir(appId) val localResources = prepareLocalResources(appStagingDir) val launchEnv = setupLaunchEnv(appStagingDir) + // From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are + // package by JDK 1.7+, so we ship PySpark archives to executors as assembly jar, and add this + // path to PYTHONPATH. + for ((resPath, res) <- localResources if resPath.contains(PYSPARK_ARCHIVES)) { + launchEnv("PYSPARK_ARCHIVES_PATH") = resPath + } val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) amContainer.setEnvironment(launchEnv) @@ -681,9 +688,10 @@ object Client extends Logging { new Client(args, sparkConf).run() } - // Alias for the Spark assembly jar and the user jar + // Alias for the Spark assembly jar, the user jar and PySpark archives val SPARK_JAR: String = "__spark__.jar" val APP_JAR: String = "__app__.jar" + val PYSPARK_ARCHIVES: String = "__pyspark__.zip" // URI scheme that identifies local resources val LOCAL_SCHEME = "local" @@ -695,6 +703,9 @@ object Client extends Logging { val CONF_SPARK_JAR = "spark.yarn.jar" val ENV_SPARK_JAR = "SPARK_JAR" + // Location of any user-defined PySpark archives + val CONF_PYSPARK_ARCHIVES = "spark.pyspark.archives" + // Internal config to propagate the location of the user's jar to the driver/executors val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" @@ -733,6 +744,19 @@ object Client extends Logging { } } + /** + * Find the user-defined PySpark archives if configured, or return default. + * The default pyspark.zip is in the same path with assembly jar. + */ + private def pysparkArchives(conf: SparkConf): String = { + if (conf.contains(CONF_PYSPARK_ARCHIVES)) { + conf.get(CONF_PYSPARK_ARCHIVES) + } else { + val sparkJarPath = SparkContext.jarOfClass(this.getClass).head + sparkJarPath.substring(0, sparkJarPath.lastIndexOf('/')) + "/pyspark.zip" + } + } + /** * Return the path to the given application's staging directory. */ diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index a8341991f64ad..2325ffcb78772 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -75,12 +75,7 @@ class ExecutorRunnable( val localResources = prepareLocalResources ctx.setLocalResources(localResources) - // From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are - // package by JDK 1.7+, so we ship PySpark archives to executors by Yarn with --py-files, and - // add this path to PYTHONPATH. - for ((resPath, res) <- localResources if resPath.contains("pyspark")) { - env("PYSPARK_ARCHIVES_PATH") = resPath - } + ctx.setEnvironment(env) val credentials = UserGroupInformation.getCurrentUser().getCredentials() @@ -304,6 +299,12 @@ class ExecutorRunnable( } System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } + + // Add PySpark archives path + sys.env.get("PYSPARK_ARCHIVES_PATH") match { + case Some(pythonArchivesPath) => env("PYSPARK_ARCHIVES_PATH") = pythonArchivesPath + case None => + } env } } From c63f31fc008801649450d6dffc7f2ba7cc77645d Mon Sep 17 00:00:00 2001 From: linweizhong Date: Fri, 17 Apr 2015 17:43:30 +0800 Subject: [PATCH 07/11] Update --- .../org/apache/spark/deploy/yarn/Client.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b1ee4096f4b6c..d07d36ffac44d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import java.io.File import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer @@ -752,8 +753,17 @@ object Client extends Logging { if (conf.contains(CONF_PYSPARK_ARCHIVES)) { conf.get(CONF_PYSPARK_ARCHIVES) } else { - val sparkJarPath = SparkContext.jarOfClass(this.getClass).head - sparkJarPath.substring(0, sparkJarPath.lastIndexOf('/')) + "/pyspark.zip" + SparkContext.jarOfClass(this.getClass) match { + case Some(jarPath) => + val path = new File(jarPath) + val archives = new File(path.getParent + File.separator + "pyspark.zip") + if (archives.exists()) { + archives.getAbsolutePath + } else { + "" + } + case None => "" + } } } From d012cdebf154bb5458c060d50d751f97c5bf3384 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Wed, 22 Apr 2015 17:34:19 +0800 Subject: [PATCH 08/11] Update --- .../api/python/PythonWorkerFactory.scala | 2 +- .../apache/spark/deploy/PythonRunner.scala | 1 - .../org/apache/spark/deploy/SparkSubmit.scala | 15 ++++++ .../scala/org/apache/spark/util/Utils.scala | 51 +++++++++++++++++++ .../org/apache/spark/deploy/yarn/Client.scala | 40 ++++----------- .../spark/deploy/yarn/ExecutorRunnable.scala | 6 --- 6 files changed, 77 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 3f6e0dd6050d2..e314408c067e9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -49,7 +49,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val pythonPath = PythonUtils.mergePythonPaths( PythonUtils.sparkPythonPath, - envVars.getOrElse("PYTHONPATH", sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "")), + envVars.getOrElse("PYTHONPATH", ""), sys.env.getOrElse("PYTHONPATH", "")) def create(): Socket = { diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 569abcfff68f5..53e18c4bcec23 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -52,7 +52,6 @@ object PythonRunner { pathElements ++= formattedPyFiles pathElements += PythonUtils.sparkPythonPath pathElements += sys.env.getOrElse("PYTHONPATH", "") - pathElements += sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "") val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) // Launch Python process diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 60bc243ebf40a..f9e20d1397a74 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -356,6 +356,21 @@ object SparkSubmit { } } + if (args.isPython && clusterManager == YARN) { + // Zip PySpark from ${SPARK_HOME}/python/pyspark to ${SPARK_HOME}/lib/pyspark.zip + // and ship to executors by Yarn. + for (sparkHome <- sys.env.get("SPARK_HOME")) { + val srcFile = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator)) + val archives = new File(Seq(sparkHome, "lib", "pyspark.zip").mkString(File.separator)) + if (archives.exists() || Utils.createZipArchives(archives, srcFile, "pyspark")) { + val py4jPath = Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip") + .mkString(File.separator) + args.files = mergeFileLists(args.files, Utils.resolveURIs(archives.getAbsolutePath), + py4jPath) + } + } + } + // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0fdfaf300e95d..bf7ef37d6ed83 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -21,6 +21,7 @@ import java.io._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer +import java.util.zip.{ZipEntry, ZipOutputStream} import java.util.{Properties, Locale, Random, UUID} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} import javax.net.ssl.HttpsURLConnection @@ -2106,6 +2107,56 @@ private[spark] object Utils extends Logging { .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) } + /** + * Create zip archives. + */ + def createZipArchives(archives: File, srcFile: File, rootPath: String): Boolean = { + var flag = false + try { + val fileOutStream = new FileOutputStream(archives) + val buffOutStream = new BufferedOutputStream(fileOutStream) + val zipOutStream = new ZipOutputStream(buffOutStream) + flag = doZip(zipOutStream, rootPath, srcFile) + zipOutStream.close() + buffOutStream.close() + fileOutStream.close() + + } catch { + case e: FileNotFoundException => logError("File to zip not found") + } + flag + } + + private def doZip(zipOutStream: ZipOutputStream, curPath: String, file: File): Boolean = { + var flag = false + if (file.isDirectory) { + val files = file.listFiles() + if (files != null && files.length > 0) { + zipOutStream.putNextEntry(new ZipEntry(curPath + File.separator)) + val nextPath = if (curPath.length == 0) "" else curPath + File.separator + for (subFile <- files) { + flag = doZip(zipOutStream, nextPath + subFile.getName, subFile) + } + } + } else { + zipOutStream.putNextEntry(new ZipEntry(curPath)) + val fileInStream = new FileInputStream(file) + val buffInStream = new BufferedInputStream(fileInStream) + val bufSize = 8192 + val buf = new Array[Byte](bufSize) + var len: Int = buffInStream.read(buf, 0, bufSize) + while (len != -1) { + zipOutStream.write(buf, 0, len) + len = buffInStream.read(buf, 0, bufSize) + } + zipOutStream.flush() + flag = true + buffInStream.close() + fileInStream.close() + } + flag + } + } /** diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d07d36ffac44d..15019b8063091 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -248,7 +248,6 @@ private[spark] class Client( List( (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR), (APP_JAR, args.userJar, CONF_SPARK_USER_JAR), - (PYSPARK_ARCHIVES, pysparkArchives(sparkConf), CONF_PYSPARK_ARCHIVES), ("log4j.properties", oldLog4jConf.orNull, null) ).foreach { case (destName, _localPath, confKey) => val localPath: String = if (_localPath != null) _localPath.trim() else "" @@ -381,19 +380,26 @@ private[spark] class Client( * This sets up the launch environment, java options, and the command for launching the AM. */ private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) - : ContainerLaunchContext = { + : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) val localResources = prepareLocalResources(appStagingDir) val launchEnv = setupLaunchEnv(appStagingDir) + // From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are // package by JDK 1.7+, so we ship PySpark archives to executors as assembly jar, and add this // path to PYTHONPATH. - for ((resPath, res) <- localResources if resPath.contains(PYSPARK_ARCHIVES)) { - launchEnv("PYSPARK_ARCHIVES_PATH") = resPath + var pysparkArchives = new ArrayBuffer[String]() + for ((resLink, res) <- localResources) { + if (resLink.contains("pyspark") || resLink.contains("py4j")) { + pysparkArchives.+=(resLink) + } } + launchEnv("PYTHONPATH") = pysparkArchives.toArray.mkString(File.pathSeparator) + sparkConf.setExecutorEnv("PYTHONPATH", pysparkArchives.toArray.mkString(File.pathSeparator)) + val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) amContainer.setEnvironment(launchEnv) @@ -692,7 +698,6 @@ object Client extends Logging { // Alias for the Spark assembly jar, the user jar and PySpark archives val SPARK_JAR: String = "__spark__.jar" val APP_JAR: String = "__app__.jar" - val PYSPARK_ARCHIVES: String = "__pyspark__.zip" // URI scheme that identifies local resources val LOCAL_SCHEME = "local" @@ -704,9 +709,6 @@ object Client extends Logging { val CONF_SPARK_JAR = "spark.yarn.jar" val ENV_SPARK_JAR = "SPARK_JAR" - // Location of any user-defined PySpark archives - val CONF_PYSPARK_ARCHIVES = "spark.pyspark.archives" - // Internal config to propagate the location of the user's jar to the driver/executors val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" @@ -745,28 +747,6 @@ object Client extends Logging { } } - /** - * Find the user-defined PySpark archives if configured, or return default. - * The default pyspark.zip is in the same path with assembly jar. - */ - private def pysparkArchives(conf: SparkConf): String = { - if (conf.contains(CONF_PYSPARK_ARCHIVES)) { - conf.get(CONF_PYSPARK_ARCHIVES) - } else { - SparkContext.jarOfClass(this.getClass) match { - case Some(jarPath) => - val path = new File(jarPath) - val archives = new File(path.getParent + File.separator + "pyspark.zip") - if (archives.exists()) { - archives.getAbsolutePath - } else { - "" - } - case None => "" - } - } - } - /** * Return the path to the given application's staging directory. */ diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 2325ffcb78772..1ce10d906ab23 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -299,12 +299,6 @@ class ExecutorRunnable( } System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } - - // Add PySpark archives path - sys.env.get("PYSPARK_ARCHIVES_PATH") match { - case Some(pythonArchivesPath) => env("PYSPARK_ARCHIVES_PATH") = pythonArchivesPath - case None => - } env } } From 5d9bcb675d5bc0d619c90039e8a2852f80ae3758 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Wed, 22 Apr 2015 17:44:09 +0800 Subject: [PATCH 09/11] Update --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 15019b8063091..960e9197ca763 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -380,7 +380,7 @@ private[spark] class Client( * This sets up the launch environment, java options, and the command for launching the AM. */ private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) - : ContainerLaunchContext = { + : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId @@ -695,7 +695,7 @@ object Client extends Logging { new Client(args, sparkConf).run() } - // Alias for the Spark assembly jar, the user jar and PySpark archives + // Alias for the Spark assembly jar and the user jar val SPARK_JAR: String = "__spark__.jar" val APP_JAR: String = "__app__.jar" From f540384378814cf7465a8d4a0adafec07070cc66 Mon Sep 17 00:00:00 2001 From: linweizhong Date: Sat, 25 Apr 2015 11:45:23 +0800 Subject: [PATCH 10/11] Update --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index f9e20d1397a74..9aacfdd3e839d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -366,7 +366,7 @@ object SparkSubmit { val py4jPath = Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip") .mkString(File.separator) args.files = mergeFileLists(args.files, Utils.resolveURIs(archives.getAbsolutePath), - py4jPath) + Utils.resolveURIs(py4jPath)) } } } From db1ba47fa409e60b0e24cd17282a3045c7d3f9df Mon Sep 17 00:00:00 2001 From: linweizhong Date: Sat, 25 Apr 2015 15:15:53 +0800 Subject: [PATCH 11/11] Update --- .../org/apache/spark/deploy/SparkSubmit.scala | 25 +++++++++-------- .../org/apache/spark/deploy/yarn/Client.scala | 27 ++++++++++++++----- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9aacfdd3e839d..5e9f9eb9241e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -357,17 +357,20 @@ object SparkSubmit { } if (args.isPython && clusterManager == YARN) { - // Zip PySpark from ${SPARK_HOME}/python/pyspark to ${SPARK_HOME}/lib/pyspark.zip - // and ship to executors by Yarn. - for (sparkHome <- sys.env.get("SPARK_HOME")) { - val srcFile = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator)) - val archives = new File(Seq(sparkHome, "lib", "pyspark.zip").mkString(File.separator)) - if (archives.exists() || Utils.createZipArchives(archives, srcFile, "pyspark")) { - val py4jPath = Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip") - .mkString(File.separator) - args.files = mergeFileLists(args.files, Utils.resolveURIs(archives.getAbsolutePath), - Utils.resolveURIs(py4jPath)) - } + sys.env.get("PYSPARK_ARCHIVES_PATH") match { + case Some(archivesPath) => args.files = mergeFileLists(args.files, archivesPath) + case None => + // Zip PySpark from ${SPARK_HOME}/python/pyspark to ${SPARK_HOME}/lib/pyspark.zip + // and ship to executors by Yarn. + for (sparkHome <- sys.env.get("SPARK_HOME")) { + val srcFile = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator)) + val archives = new File(Seq(sparkHome, "lib", "pyspark.zip").mkString(File.separator)) + if (archives.exists() || Utils.createZipArchives(archives, srcFile, "pyspark")) { + val py4jPath = Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip") + .mkString(File.separator) + args.files = mergeFileLists(args.files, archives.getAbsolutePath, py4jPath) + } + } } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 960e9197ca763..126a6a9f9642b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -391,14 +391,27 @@ private[spark] class Client( // From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are // package by JDK 1.7+, so we ship PySpark archives to executors as assembly jar, and add this // path to PYTHONPATH. - var pysparkArchives = new ArrayBuffer[String]() - for ((resLink, res) <- localResources) { - if (resLink.contains("pyspark") || resLink.contains("py4j")) { - pysparkArchives.+=(resLink) - } + val pysparkArchives = new ArrayBuffer[String]() + sys.env.get("PYSPARK_ARCHIVES_PATH") match { + case Some(archivesPath) => + archivesPath.split(",").foreach { path => + val uri = new URI(path) + if (uri.getScheme == LOCAL_SCHEME) { + pysparkArchives.+=(uri.getPath) + } else { + pysparkArchives.+=(new File(path).getName) + } + } + case None => + for ((resLink, res) <- localResources) { + if (resLink.contains("pyspark") || resLink.contains("py4j")) { + pysparkArchives.+=(resLink) + } + } } - launchEnv("PYTHONPATH") = pysparkArchives.toArray.mkString(File.pathSeparator) - sparkConf.setExecutorEnv("PYTHONPATH", pysparkArchives.toArray.mkString(File.pathSeparator)) + val pythonPath = pysparkArchives.toArray.mkString(File.pathSeparator) + launchEnv("PYTHONPATH") = pythonPath + sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources)