From 369e96c4bd0cddd43efa6d19b5fff72216741498 Mon Sep 17 00:00:00 2001 From: takaomag Date: Wed, 8 Apr 2015 05:32:39 +0000 Subject: [PATCH] Add wheel package support for PySpark --- .../spark/deploy/SparkSubmitArguments.scala | 4 +- docs/programming-guide.md | 2 +- docs/submitting-applications.md | 12 +- .../apache/spark/launcher/SparkLauncher.java | 2 +- python/pyspark/context.py | 104 ++++++++++++++---- python/pyspark/tests.py | 16 +++ python/pyspark/worker.py | 41 ++++++- .../testpackage1-0.0.1-py2.py3-none-any.whl | Bin 0 -> 2375 bytes .../testpackage2-0.0.1-py2.py3-none-any.whl | Bin 0 -> 2137 bytes .../yarn/ApplicationMasterArguments.scala | 2 +- .../spark/deploy/yarn/ClientArguments.scala | 2 +- 11 files changed, 147 insertions(+), 38 deletions(-) create mode 100644 python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl create mode 100644 python/test_support/testpackage2-0.0.1-py2.py3-none-any.whl diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 6eb73c43470a..294602a4638a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -447,8 +447,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | coordinates should be groupId:artifactId:version. | --repositories Comma-separated list of additional remote repositories to | search for the maven coordinates given with --packages. - | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place - | on the PYTHONPATH for Python apps. + | --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to + | place on the PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working | directory of each executor. | diff --git a/docs/programming-guide.md b/docs/programming-guide.md index f4fabb0927b6..3a6c6ffb1f9b 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -204,7 +204,7 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes, In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the -context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files +context connects to using the `--master` argument, and you can add Python .whl, .egg, .zip or .py files to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType) diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 3ecbf2308cd4..47904ff5bf94 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -18,9 +18,9 @@ as `provided` dependencies; these need not be bundled since they are provided by the cluster manager at runtime. Once you have an assembled jar you can call the `bin/spark-submit` script as shown here while passing your jar. -For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.zip` or `.egg` -files to be distributed with your application. If you depend on multiple Python files we recommend -packaging them into a `.zip` or `.egg`. +For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.whl`, `.egg` +or `.zip` files to be distributed with your application. If you depend on multiple Python files we +recommend packaging them into a `.whl`, `.egg` or `.zip`. # Launching Applications with spark-submit @@ -62,7 +62,7 @@ the drivers and the executors. Note that `cluster` mode is currently not support Mesos clusters or Python applications. For Python applications, simply pass a `.py` file in the place of `` instead of a JAR, -and add Python `.zip`, `.egg` or `.py` files to the search path with `--py-files`. +and add Python `.whl`, `.egg`, `.zip` or `.py` files to the search path with `--py-files`. There are a few options available that are specific to the [cluster manager](#cluster-overview.html#cluster-manager-types) that is being used. @@ -179,8 +179,8 @@ with `--packages`. All transitive dependencies will be handled when using this c repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`. These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages. -For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries -to executors. +For Python, the equivalent `--py-files` option can be used to distribute `.whl`, `.egg`, `.zip` +and `.py` libraries to executors. # More Information diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index b566507ee606..5f03ec8c62c1 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -225,7 +225,7 @@ public SparkLauncher addFile(String file) { } /** - * Adds a python file / zip / egg to be submitted with the application. + * Adds a python file / zip / whl / egg to be submitted with the application. * * @param file Path to the file. * @return This launcher. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 78dccc40470e..bbbe0b02c8eb 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -21,6 +21,8 @@ from threading import Lock from tempfile import NamedTemporaryFile +from pip.commands.install import InstallCommand as pip_InstallCommand + from py4j.java_collections import ListConverter from pyspark import accumulators @@ -62,9 +64,9 @@ class SparkContext(object): _next_accum_id = 0 _active_spark_context = None _lock = Lock() - _python_includes = None # zip and egg files that need to be added to PYTHONPATH + _python_includes = None # whl, egg, zip and jar files that need to be added to PYTHONPATH - PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') + PACKAGE_EXTENSIONS = ('.whl', '.egg', '.zip', '.jar') def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, @@ -77,9 +79,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, (e.g. mesos://host:port, spark://host:port, local[4]). :param appName: A name for your job, to display on the cluster web UI. :param sparkHome: Location where Spark is installed on cluster nodes. - :param pyFiles: Collection of .zip or .py files to send to the cluster - and add to PYTHONPATH. These can be paths on the local file - system or HDFS, HTTP, HTTPS, or FTP URLs. + :param pyFiles: Collection of .py, .whl, .egg or .zip files to send + to the cluster and add to PYTHONPATH. These can be paths on + the local file system or HDFS, HTTP, HTTPS, or FTP URLs. :param environment: A dictionary of environment variables to set on worker nodes. :param batchSize: The number of Python objects represented as a single @@ -178,18 +180,24 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, sys.path.insert(1, root_dir) # Deploy any code dependencies specified in the constructor + # Wheel files will be installed by pip later. self._python_includes = list() - for path in (pyFiles or []): - self.addPyFile(path) + if pyFiles: + for path in pyFiles: + self.addFile(path) + self._include_python_packages(paths=pyFiles) + else: + pyFiles = [] # Deploy code dependencies set by spark-submit; these will already have been added - # with SparkContext.addFile, so we just need to add them to the PYTHONPATH - for path in self._conf.get("spark.submit.pyFiles", "").split(","): - if path != "": - (dirname, filename) = os.path.split(path) - if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: - self._python_includes.append(filename) - sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + # with SparkContext.addFile, so we just need to include them. + # Wheel files will be installed by pip later. + spark_submit_pyfiles = self._conf.get("spark.submit.pyFiles", "").split(",") + if spark_submit_pyfiles: + self._include_python_packages(paths=spark_submit_pyfiles) + + # Install all wheel files at once. + self._install_wheel_files(paths=pyFiles + spark_submit_pyfiles) # Create a temporary directory inside spark.local.dir: local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) @@ -693,23 +701,71 @@ def clearFiles(self): Clear the job's list of files added by L{addFile} or L{addPyFile} so that they do not get downloaded to any new nodes. """ - # TODO: remove added .py or .zip files from the PYTHONPATH? + # TODO: remove added .py, .whl, .egg or .zip files from the PYTHONPATH? self._jsc.sc().clearFiles() def addPyFile(self, path): """ - Add a .py or .zip dependency for all tasks to be executed on this - SparkContext in the future. The C{path} passed can be either a local - file, a file in HDFS (or other Hadoop-supported filesystems), or an - HTTP, HTTPS or FTP URI. + Add a .py, .whl, .egg or .zip dependency for all tasks to be + executed on this SparkContext in the future. The C{path} passed can + be either a local file, a file in HDFS (or other Hadoop-supported + filesystems), or an HTTP, HTTPS or FTP URI. """ self.addFile(path) - (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix + self._include_python_packages(paths=(path,)) + self._install_wheel_files(paths=(path,)) - if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: - self._python_includes.append(filename) - # for tests in local mode - sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + def _include_python_packages(self, paths): + """ + Add Python package dependencies. Python packages (except for .whl) are + added to PYTHONPATH. + """ + root_dir = SparkFiles.getRootDirectory() + for path in paths: + basename = os.path.basename(path) + extname = os.path.splitext(basename)[1].lower() + if extname in self.PACKAGE_EXTENSIONS \ + and basename not in self._python_includes: + self._python_includes.append(basename) + if extname != '.whl': + # Prepend the python package (except for *.whl) to sys.path + sys.path.insert(1, os.path.join(root_dir, basename)) + + def _install_wheel_files( + self, + paths, + quiet=True, + upgrade=True, + no_deps=True, + no_index=True, + ): + """ + Install .whl files at once by pip install. + """ + root_dir = SparkFiles.getRootDirectory() + paths = { + os.path.join(root_dir, os.path.basename(path)) + for path in paths + if os.path.splitext(path)[1].lower() == '.whl' + } + if not paths: + return + + pip_args = [ + '--find-links', root_dir, + '--target', os.path.join(root_dir, 'site-packages'), + ] + if quiet: + pip_args.append('--quiet') + if upgrade: + pip_args.append('--upgrade') + if no_deps: + pip_args.append('--no-deps') + if no_index: + pip_args.append('--no-index') + pip_args.extend(paths) + + pip_InstallCommand().main(args=pip_args) def setCheckpointDir(self, dirName): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index dd8d3b1c5373..c93f948707c1 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -367,6 +367,22 @@ def func(): from userlib import UserClass self.assertEqual("Hello World from inside a package!", UserClass().hello()) + def test_add_whl_file_locally(self): + # To ensure that we're actually testing addPyFile's effects, check that + # this fails due to `testpackage1` or `testpackage2` not being on the + # Python path: + def func(): + from testpackage2 import TestPackage1Class + self.assertRaises(ImportError, func) + paths = [ + os.path.join(SPARK_HOME, "python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl"), + os.path.join(SPARK_HOME, "python/test_support/testpackage2-0.0.1-py2.py3-none-any.whl"), + ] + for path in paths: + self.sc.addPyFile(path) + from testpackage2 import TestPackage1Class + self.assertEqual("Hello World from inside a package!", TestPackage1Class().hello()) + def test_overwrite_system_module(self): self.sc.addPyFile(os.path.join(SPARK_HOME, "python/test_support/SimpleHTTPServer.py")) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8a93c320ec5d..30671726c8f0 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -18,12 +18,15 @@ """ Worker that receives input from Piped RDD. """ +import fcntl import os import sys import time import socket import traceback +from pip.commands.install import InstallCommand as pip_InstallCommand + from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry from pyspark.files import SparkFiles @@ -66,12 +69,46 @@ def main(infile, outfile): SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True - # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH add_path(spark_files_dir) # *.py files that were added will be copied here + + # fetch names of includes and construct PYTHONPATH if the file extension + # is not '.whl' num_python_includes = read_int(infile) + wheel_files = set() for _ in range(num_python_includes): filename = utf8_deserializer.loads(infile) - add_path(os.path.join(spark_files_dir, filename)) + path = os.path.join(spark_files_dir, filename) + if os.path.splitext(filename)[1].lower() == '.whl': + wheel_files.add(path) + else: + add_path(path) + + if wheel_files: + # Install wheel files + + local_site_packages_dir = os.path.join( + spark_files_dir, + 'site-packages', + ) + with open(os.path.join( + spark_files_dir, + '.pyspark_pip_install.lock' + ), 'w') as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + try: + if os.path.exists(local_site_packages_dir) is False: + # '--no-deps' is not set. + # All dependencies must be there. + pip_InstallCommand().main(args=[ + '--quiet', + '--upgrade', + '--no-index', + '--target', local_site_packages_dir, + '--find-links', spark_files_dir, + ] + list(wheel_files)) + finally: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + add_path(local_site_packages_dir) # fetch names and values of broadcast variables num_broadcast_variables = read_int(infile) diff --git a/python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl b/python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl new file mode 100644 index 0000000000000000000000000000000000000000..76b9ff834a1bb3ac7977c68aa9e6fdc28bf17085 GIT binary patch literal 2375 zcmWIWW@Zs#U|`^2xX#hxHtoJXgEo+70K^hNT#{N`QjnOOotU0#s2?AnnU`4-AFo$X z>8*Y0e5i(}uAZic&pAIIf72_ceb1Z;K5<6(guYfF2cs8@gX)}Oq3-UT3;}Qp#Dd%1 z^qE`_mjex+3&bF6Q7p(!%*+ECuG?!U#H1+VTfC9~Je9 zCQnb#J`$Ckcd9|}#TCB0L2Fm8nZTjn$gxI8$1bo}cH76BD<4@s>He(0{ZqlLl!?bD z?|a|RrranfT|CWo=41JDCg1$EH$0Dgq^HY%L@j^GqRcy%SMI+$H%lMkP#w+=w>aJU zOFTdaasjapnnQIB^bGV2^-?m6OLQ~y((?6PT!Wp1JOe^J{r&Wcic7e^e*X5E|NG-d zkC>3W4>EPl29fa9K-0beu@)gyb5lzaQxZ!O^|Ffd^L+bv^BpqaajjQ9#Uj+@bo2-p zk4{6wUZb^{tIhaT^#VfwSR0lcc>M9=-ftSZ5>v7jIh89lqE-}WiaSZEXf!I+Cv@~~ zev{($d6iQ);|bQZ?VCEM*k>Q{JG13fnAiEpuDyl=Y+-*Kwr)=H5-%5Tkzdq)fh#Zd z_j(uYwfn>$bYC#)dpgVB|@fzd$_v#aG&?^J$sSYTUYDcne&^246YbI zDDpq+t?Q+8Qh!sBhu29Ry-t1hm4O!wE*KkKG`{eJ>(qJui(hrF@oMOLojvKZDM&+8 z%l9dlr_c5vjTI)B&YnK+ebx8O>XuFkWtSaj$>s2ecdpGqXPp3If}Zqs4RLgF3~`L@ zF$AZS^7)6vJ*F6?B|9@Ysj{32GZfgGw@5{{s#>Wf>d3SI|LX0E7Fvr2h5OxXzxc>a zxEsq_52^Xae{nU)d$)BpYGZ0T3`3BVVTB!4>bk&cl^3}2pCUH!1z)o6v#oY&i+9z ztrt%gEeeomdsw@wNZmoAf7c6E1tT|sfFry|3sU61$S|F~TAuHts-5?sa{vB19znTB zTl$Jm)VKuRoNCdv;-P+H*9_4E^H-Gr3fi*f%aPdUnUglJoa46E%<6(gpJDp09|yK` zYdq0X|MB*>^uL!|3vd42@WoqLK0j`=;g?1Ab`IsUGqzMuS$I>tL~Of*+>uV3hZ*&1 z5{aLsl{34RNoZ*cs-1p!uU_Y=!=fL2@w;a5%$oHq%;}n*(Um+a4byezkB#o+SWZ2} z>)#gi%|u^IX@;oucjqey^lt=I99!RDzIB}i+w+wh_C8z}u__`fD5~9a^7Xv(Q_}2C zo|)pO|Ju)w)~wOmw?n-4W23j~3#wSiC zVqUx_%dABa_y7MbddcwXu-&}c1Kgg9+x7w8j7%cTxN8<*Isk(ujUWm+_afIb$fjVcJ|Jc=Ff3_IL^lOmnIM~htz1W# zzy(aA$nL->MbJulbd%A`c7(|ZgiJ;$>Cw$aFF+CI1~Ua7p~~$p_hLM^ZJ2h3Es$pm4@g>q8CO8BSTpT7zrtw0=!w-K$aT#{N`QjnOOotU0#q#qxjnU`4-AFo$X z>8*Y0e5i(}uAZic&pAIIf76f?XLL{KYXx#JcCk7fol`8--My0`0B*J}XNOyyZv7=5 zpqX4itb=B@u7RF`o}pe!W^svbW?ovpzKd(HbC731h^N1wUQuxg_t($gKJ$Nn{OA!A znyD`Z`DUL6nr6btz@SCQ)ZEmP#FWI6M7^xy{5;=&TmQojJZ=9^y;Rd$c-b^5iM36S zRdkzHmrv5I;A;6FbG8MR{GU4`?Zk`Y>*a4R=6eumuYGogrDJsybJ(xI%va9NKWwnC6E>cT$kgpHy|$+HVXq;_RO#cDjW5+Yt?vhl-@cdp z!0p~V!DrKA9z}lp_3h`T+ch=0_qMk!)ArIVH$M3PdFfAP)PT;~YOlc$3<_Q#CKx0o z`33Pgsb#4-dL>{t{X7}jRXcXgDMasrwxqwtI90UElx~JeT*Jo?Z)-_ z@j%^qK&(W_lyDDMS0C>49=>NU@_OrPojY@WbCAIm;|E3lXT5d3bWZAT3i9wesiW7a z&%QG7g24r2ql?BDo^YKyuYd8Y&NW^QU9YnzeKrMYXlnUB|?}DWUAL11&ifT;SrE1a#IDASUQZU)K;v7sn9C*giwPLk2vq<@1%qJ*JkOX*<%u z!otFowpya&mWg6v+r4=ljI3_^|KFc}{YZtITji1&Yvi|NXtA=%W*(ekk#PIrRH+|N zRt1If{(d8KEU??)(^>vUSB1spxxE;Zzkf-5D0?zMNtZ7z(@dRtbIg<<_1;ApzdP6E zcsu{Pw9U3?X1Vaqi?+8tJkRAmnqQyYP|o>)5j8=mn9bcZ6&P(>nHU(9359u(tFwQQ zOY6jwMXMYn+`g|oHJf3f>Kf8CXIXrEvpq#9Jt%69+GvR$639EYcY`oK{6?pr~ z0`8O(9*=%Y|2>>s+4r1b`t_DgrS)=l^Gwq?t;F?rOiQ|b%F8_U_5_Qtdh!8UaYn8cxv6>Tahh{#yM6GmT2d>9>3VRk7e2xRW~6nw)KqZN&c>TD$8!ZD{hMZ zf6;u|rMrKVDlIzC9%$J4HCrOPO)OjAVt(&F{yD2}*k1W<(Xny&?|Z(tyMKp>XE}Ue zo#J~S`QM85l8zB)F7Qgce3&J?zNUZor29>ov#yxOvQW^LJKyzta}@9=v8J33a0YU$cu zKDVqe`u`n|C(n((nYFxoU!SkIUq2~lfo7PB<=YhF8QbE!wksC8vv?JzSuVZN;C!;- z=2yile=j7=_`dyrfHxzP2s7@|8W^=;u%r=0A?HuzvK!eHY~?b<3=AsvH2y>4DQ#rD~FFl>z1)yyn8nJ9Hz_ zi#vpoNra397k=pGp_fkx^B93;GhQdcN-K0D(Mtz}k=F