Skip to content

Conversation

@WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Jul 12, 2019

What changes were proposed in this pull request?

PySpark worker daemon reads from stdin the worker PIDs to kill.

worker_pid = read_int(stdin_bin)

However, the worker process is a forked process from the worker daemon process and we didn't close stdin on the child after fork. This means the child and user program can read stdin as well, which blocks daemon from receiving the PID to kill. This can cause issues because the task reaper might detect the task was not terminated and eventually kill the JVM.

This PR fix this by redirecting the standard input of the forked child to devnull.

How was this patch tested?

Manually test.

In pyspark, run:

import subprocess
def task(_):
  subprocess.check_output(["cat"])

sc.parallelize(range(1), 1).mapPartitions(task).count()

Before:
The job will get stuck and press Ctrl+C to exit the job but the python worker process do not exit.
After:
The job finish correctly. The "cat" print nothing (because the dummay stdin is "/dev/null").
The python worker process exit normally.

Please review https://spark.apache.org/contributing.html before opening a pull request.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable to me. Maybe CC @JoshRosen

@SparkQA
Copy link

SparkQA commented Jul 12, 2019

Test build #107602 has finished for PR 25138 at commit a17a86a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

JoshRosen commented Jul 13, 2019

Good catch. This seems reasonable to me.

One question, though: is it possible to add a regression test for this? Here's some brainstorming on how we might do that:

  • Prior to this patch's change, the example you gave would hang indefinitely upon cancellation, whereas afterwards it'll exit quickly, so maybe we can add a Python unit test alongside existing Python-initiated cancel tests (assuming that we have those tests in .py files or some other type of test case that lets us run PySpark code).
  • In our test, could we start an async job, wait until it's running, kill the job, then assert that the number of running tasks (as reported by executor status API) reaches 0 within some reasonable time period?
  • One risk in my proposal is race conditions in waiting for the tasks to kick off: if we cancel a task before it forks a child process then our test will spuriously pass (i.e. it wouldn't function as a reliable regression test). Fixing this is kinda tricky, though, because our usual trick of using semaphores / countdown latches doesn't work well in a cross-language world. Maybe our best option is to just hardcode some reasonable time constants there? Or have the Python code touch a file and then wait for that file to appear before killing the task (e.g. a subprocess of touch /filename/picked/by/driver/in/advance && cat).

Just thinking aloud here; let me know if you can think of a cleaner way to test this (or whether we can regression-test this via some other means).

@HyukjinKwon
Copy link
Member

Yup, actually I suggested this way at the first place but couldn't have enough time to verify details. +1 for @JoshRosen's comment.

@WeichenXu123
Copy link
Contributor Author

WeichenXu123 commented Jul 15, 2019

@JoshRosen @srowen @HyukjinKwon
After I test more on this, I found another issue. We cannot simply close stdin. Instead, we should create another file to occupy the file descriptor "0". Otherwise the file descriptor "0" will be possible to be allocated to other file such as here

infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size)

(i.e. in L58 the dup sock file will be allocated file descriptor "0")

So I update my code. If this is OK, I will update test then.
Thanks!

@SparkQA
Copy link

SparkQA commented Jul 15, 2019

Test build #107689 has finished for PR 25138 at commit 8eecf44.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 15, 2019

Test build #107691 has finished for PR 25138 at commit 8eecf44.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107709 has finished for PR 25138 at commit b6ceb30.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107710 has finished for PR 25138 at commit be73d73.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

res = sys.stdin.read()
# Because the stdin is replaced with '/dev/null'
# Read data from it will get EOF
assert res == '', "Expect read EOF from stdin."
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify read stdin get EOF immediately.
Should we add more test such as verifying the worker process actually exit ?
But I think current test is enough, the fact we can only read EOF from stdin represent the stdin is dummy and safe file descriptor, it won't influence other file descriptors in daemon.

Copy link
Contributor Author

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thoughts about test.

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107714 has finished for PR 25138 at commit 3fbfe56.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123
Copy link
Contributor Author

Gently ping @JoshRosen

@WeichenXu123
Copy link
Contributor Author

Gently ping @HyukjinKwon @ueshin

@srowen
Copy link
Member

srowen commented Jul 29, 2019

I don't know this part well, but given the analysis and test, seems OK? @HyukjinKwon @JoshRosen

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jul 29, 2019

Seems a-okay from a cursory look. but .. I will take a closer look since here's PySpark's core path .. I have been stuck in some works. I'll take a look within one day and leave some comments.

Also, I hope @JoshRosen can have a chance to take a look as well.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good by me. @JoshRosen, mind if I ask to double check when you have a chance?

@HyukjinKwon HyukjinKwon changed the title [SPARK-26175][PYSPARK] Closing stdin of the worker process right after fork [SPARK-26175][PYTHON] Redirect the standard input of the forked child to devnull in daemon Jul 30, 2019
@HyukjinKwon
Copy link
Member

@WeichenXu123, can you update PR description as well when you address comments?

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108392 has finished for PR 25138 at commit 51b1a66.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108390 has finished for PR 25138 at commit 22a4a2c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108401 has finished for PR 25138 at commit 51b1a66.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

I took a quick cursory look and this seems reasonable to me.

/cc @GregOwen @srinathshankar as FYI

@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants