From 050651f4c55bdb8d61277bff6c7c382ef9623f1b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 14 Nov 2014 15:31:11 -0800 Subject: [PATCH 1/3] JVM should exit after Python exit --- bin/pyspark | 2 -- bin/pyspark2.cmd | 1 - .../apache/spark/deploy/SparkSubmitDriverBootstrapper.scala | 2 +- python/pyspark/java_gateway.py | 4 +++- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index 1d8c94d43d285..0b4f695dd06dd 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -132,7 +132,5 @@ if [[ "$1" =~ \.py$ ]]; then gatherSparkSubmitOpts "$@" exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}" else - # PySpark shell requires special handling downstream - export PYSPARK_SHELL=1 exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS fi diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index 59415e9bdec2c..a542ec80b49d6 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R "\.py"') do ( ) if [%PYTHON_FILE%] == [] ( - set PYSPARK_SHELL=1 if [%IPYTHON%] == [1] ( ipython %IPYTHON_OPTS% ) else ( diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 2b894a796c8c6..0ea4fc814f9c0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -139,7 +139,7 @@ private[spark] object SparkSubmitDriverBootstrapper { // subprocess there already reads directly from our stdin, so we should avoid spawning a // thread that contends with the subprocess in reading from System.in. val isWindows = Utils.isWindows - val isPySparkShell = sys.env.contains("PYSPARK_SHELL") + val isPySparkShell = sys.env.contains("PYSPARK_INSIDE_PYTHON") if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") stdinThread.start() diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 9c70fa5c16d0c..a5fce0bb02e49 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -45,7 +45,9 @@ def launch_gateway(): # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + env = dict(os.environ) + env["PYSPARK_INSIDE_PYTHON"] = "1" # tell JVM to exit after python exits + proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env) else: # preexec_fn not supported on Windows proc = Popen(command, stdout=PIPE, stdin=PIPE) From ce8599cb79c5acfd6aea4311f5959e3a6cfbadcd Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 14 Nov 2014 16:04:53 -0800 Subject: [PATCH 2/3] address comments --- .../apache/spark/deploy/SparkSubmitDriverBootstrapper.scala | 4 ++-- python/pyspark/java_gateway.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 0ea4fc814f9c0..8fe7848a2bcb6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -139,14 +139,14 @@ private[spark] object SparkSubmitDriverBootstrapper { // subprocess there already reads directly from our stdin, so we should avoid spawning a // thread that contends with the subprocess in reading from System.in. val isWindows = Utils.isWindows - val isPySparkShell = sys.env.contains("PYSPARK_INSIDE_PYTHON") + val isPySpark = sys.env.contains("PYSPARK") if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") stdinThread.start() // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM // should terminate on broken pipe, which signals that the parent process has exited. In // Windows, the termination logic for the PySpark shell is handled in java_gateway.py - if (isPySparkShell) { + if (isPySpark) { stdinThread.join() process.destroy() } diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index a5fce0bb02e49..2773091537f03 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -46,7 +46,7 @@ def launch_gateway(): def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) env = dict(os.environ) - env["PYSPARK_INSIDE_PYTHON"] = "1" # tell JVM to exit after python exits + env["PYSPARK"] = "1" # tell JVM to exit after python exits proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env) else: # preexec_fn not supported on Windows From df0e524a78096b8fcc634f1349508936199e83fc Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 14 Nov 2014 18:12:30 -0800 Subject: [PATCH 3/3] address comments --- .../spark/deploy/SparkSubmitDriverBootstrapper.scala | 11 ++++++----- python/pyspark/java_gateway.py | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 8fe7848a2bcb6..5981482ac6fb2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -139,14 +139,15 @@ private[spark] object SparkSubmitDriverBootstrapper { // subprocess there already reads directly from our stdin, so we should avoid spawning a // thread that contends with the subprocess in reading from System.in. val isWindows = Utils.isWindows - val isPySpark = sys.env.contains("PYSPARK") + val isSubprocess = sys.env.contains("IS_SUBPROCESS") if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") stdinThread.start() - // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM - // should terminate on broken pipe, which signals that the parent process has exited. In - // Windows, the termination logic for the PySpark shell is handled in java_gateway.py - if (isPySpark) { + // Spark submit (JVM) may can runs as a subprocess, and so this JVM should terminate on + // broken pipe, signaling that the parent process has exited. This is the case if the + // application is launched directly from python, as in the PySpark shell. In Windows, + // the termination logic is handled in java_gateway.py + if (isSubprocess) { stdinThread.join() process.destroy() } diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 2773091537f03..a975dc19cb78e 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -46,7 +46,7 @@ def launch_gateway(): def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) env = dict(os.environ) - env["PYSPARK"] = "1" # tell JVM to exit after python exits + env["IS_SUBPROCESS"] = "1" # tell JVM to exit after python exits proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env) else: # preexec_fn not supported on Windows