Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,5 @@ if [[ "$1" =~ \.py$ ]]; then
gatherSparkSubmitOpts "$@"
exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}"
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
fi
1 change: 0 additions & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R "\.py"') do (
)

if [%PYTHON_FILE%] == [] (
set PYSPARK_SHELL=1
if [%IPYTHON%] == [1] (
ipython %IPYTHON_OPTS%
) else (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,15 @@ private[spark] object SparkSubmitDriverBootstrapper {
// subprocess there already reads directly from our stdin, so we should avoid spawning a
// thread that contends with the subprocess in reading from System.in.
val isWindows = Utils.isWindows
val isPySparkShell = sys.env.contains("PYSPARK_SHELL")
val isSubprocess = sys.env.contains("IS_SUBPROCESS")
if (!isWindows) {
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
stdinThread.start()
// For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM
// should terminate on broken pipe, which signals that the parent process has exited. In
// Windows, the termination logic for the PySpark shell is handled in java_gateway.py
if (isPySparkShell) {
// Spark submit (JVM) may can runs as a subprocess, and so this JVM should terminate on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can run. I'll fix this when I merge it

// broken pipe, signaling that the parent process has exited. This is the case if the
// application is launched directly from python, as in the PySpark shell. In Windows,
// the termination logic is handled in java_gateway.py
if (isSubprocess) {
stdinThread.join()
process.destroy()
}
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def launch_gateway():
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
env = dict(os.environ)
env["IS_SUBPROCESS"] = "1" # tell JVM to exit after python exits
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdout=PIPE, stdin=PIPE)
Expand Down