-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-2764] Simplify daemon.py process structure #1680
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
QA tests have started for PR 1680. This patch merges cleanly. |
|
This code is fairly complex and I've probably made a mistake somewhere (registered a bad signal handler, ignored an exception, etc), so I'd greatly appreciate a thorough review. There might also be an opportunity to fix SPARK-1740 in this PR. For now, this is [WIP]; I'd like to thoroughly comment this code before I commit it. /cc @davies (who has an outstanding PR, #1643, that modifies this code), @aarondav (who has contributed bugfixes to this code), and @jey (who contributed the original |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file had two different versions of this "cleanup zombie children" handle_sigchld function that are slightly different, so it might be a good idea to understand whether there's a reason why they differed.
|
If you want to test this out, here's a neat experiment you can run in an interactive from time import sleep
def wait(x):
sleep(10) # 10 seconds
return x
sc.parallelize(range(10000)).mapPartitions(wait).count()Prior to this patch, when I ran this with After applying this patch, I see a peak of 5 Python processes running, one for the python driver, one for This patch may have broken task cancellation; I'd appreciate help in designing a regression test suite that checks for timely shutdown of Python workers when jobs are cancelled. I'm considering an approach where we do something similar to this |
|
QA results for PR 1680: |
|
@aarondav if you can take a look you'd be great to review this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
select() should have an timeout here, so after SIGTERM, it will not exit until events happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this updated code still need a timeout? The current code exits directly in the SIGTERM handler.
|
Why we had multiple listen process before? How about the performance of fork() in EC2? I met that fork() will take 200ms in Xen VM. |
|
I agree with this design; the preforking was basically vestigial and should have been removed. I'll review this PR later this week. |
|
@davies I hadn't heard about this Xen fork() performance issue before; thanks for pointing it out! Both before and after this patch, we still fork() the same number of processes when launching a set of workers; the only difference here is that those fork()s are issued serially rather than in parallel. Do you know whether these fork()s benefit from a parallel speedup? The old code probably has a higher latency to launch the very first set of tasks, since it has to serially fork() the intermediate layer of processes that this patch removes. Keep in mind that we're fork()ing a process with an extremely small heap, so it may not actually be that expensive. |
|
Another thing to test: a crash in a PySpark worker should never crash the daemon: >>> rdd = sc.parallelize(range(10000))
>>> rdd.count() # Force the workers / daemon to launch
# Crash the workers abruptly, with no error-propagation back to the JVM
>>> import os
>>> rdd.mapPartitions(lambda x: os._exit(-1)).count() This seems to be mis-handled in my current code, since a worker crash throws a RuntimeError from the SIGCHLD handler, killing the daemon. |
|
QA tests have started for PR 1680. This patch merges cleanly. |
|
QA results for PR 1680: |
Curently, daemon.py forks a pool of numProcessors subprocesses, and those processes fork themselves again to create the actual Python worker processes that handle data. I think that this extra layer of indirection is unnecessary and adds a lot of complexity. This commit attemps to remove this middle layer of subprocesses by launching the workers directly from daemon.py. See mesos/spark#563 for the original PR that added daemon.py, where I raise some issues with the current design.
After including these logging statements, orphaned workers might stay alive after the driver died. My current theory is that the print to sys.stderr failed due to Java closing the file and an exception was thrown that managed to propagate to to the uncaught exception handler, causing daemon.py to exit before it could send SIGHUP to its children.
|
QA tests have started for PR 1680. This patch merges cleanly. |
|
QA results for PR 1680: |
|
QA tests have started for PR 1680. This patch merges cleanly. |
|
QA results for PR 1680: |
|
QA tests have started for PR 1680. This patch merges cleanly. |
|
QA results for PR 1680: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies raised a good question about whether select.select() can return other errors here and whether we should try to more gracefully handle those errors. According to man select:
An error return from select() indicates:
[EAGAIN] The kernel was (perhaps temporarily) unable to allocate the requested number of file descriptors.
[EBADF] One of the descriptor sets specified an invalid descriptor.
[EINTR] A signal was delivered before the time limit expired and before any of the selected events occurred.
[EINVAL] The specified time limit is invalid. One of its components is negative or too large.
[EINVAL] ndfs is greater than FD_SETSIZE and _DARWIN_UNLIMITED_SELECT is not defined.
I think only EINTR is recoverable here. I've updated this code to use the EINTR constant instead of the magic number 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most cases, EAGAIN should be recovable, should we also catch that?
|
Had an offline discussion with @davies, who suggested that we send SIGKILL instead of SIGHUP to kill the Python workers. This would prevent the workers from becoming zombie processes if they overrode the SIGHUP handlers or became deadlocked in C code and were unable to respond to signals. Any edge-cases in the SIGHUP handling here should also be present in the original version, so I'm inclined to merge this patch now and revisit the SIGKILL suggestion in a later patch. Also, we may want to take additional time to consider whether we want to support cleaner termination of workers (e.g. first attempt to shut them down gracefully so |
|
QA tests have started for PR 1680. This patch merges cleanly. |
|
QA results for PR 1680: |
|
LGTM, merging into master. |
Curently, daemon.py forks a pool of numProcessors subprocesses, and those processes fork themselves again to create the actual Python worker processes that handle data. I think that this extra layer of indirection is unnecessary and adds a lot of complexity. This commit attempts to remove this middle layer of subprocesses by launching the workers directly from daemon.py. See mesos/spark#563 for the original PR that added daemon.py, where I raise some issues with the current design. Author: Josh Rosen <joshrosen@apache.org> Closes apache#1680 from JoshRosen/pyspark-daemon and squashes the following commits: 5abbcb9 [Josh Rosen] Replace magic number: 4 -> EINTR 5495dff [Josh Rosen] Throw IllegalStateException if worker launch fails. b79254d [Josh Rosen] Detect failed fork() calls; improve error logging. 282c2c4 [Josh Rosen] Remove daemon.py exit logging, since it caused problems: 8554536 [Josh Rosen] Fix daemon’s shutdown(); log shutdown reason. 4e0fab8 [Josh Rosen] Remove shared-memory exit_flag; don't die on worker death. e9892b4 [Josh Rosen] [WIP] [SPARK-2764] Simplify daemon.py process structure.
…pache#1680) ### What changes were proposed in this pull request? rdar://105349722 (Backport SPARK-42357 Log `exitCode` when `SparkContext.stop` starts) This is a revised backport of the community patch because `exitCode` patch doesn't exist in the branch-3.2. However, this provide a marker for `SparkContext.stop` still. This PR aims to log `exitCode` when `SparkContext.stop` starts as a clear boundary to ignore the meaningless log messages from user jobs. ### Why are the changes needed? This PR adds the following log. ``` 23/02/06 02:12:55 INFO SparkContext: SparkContext is stopping with exitCode 0. ``` In the simplest case, it stops like the following. ``` $ bin/spark-submit examples/src/main/python/pi.py ... Pi is roughly 3.147080 23/02/06 02:12:55 INFO SparkContext: SparkContext is stopping with exitCode 0. 23/02/06 02:12:55 INFO AbstractConnector: Stopped Spark@1cb72b8{HTTP/1.1, (http/1.1)}{localhost:4040} 23/02/06 02:12:55 INFO SparkUI: Stopped Spark web UI at http://localhost:4040 23/02/06 02:12:55 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 23/02/06 02:12:55 INFO MemoryStore: MemoryStore cleared 23/02/06 02:12:55 INFO BlockManager: BlockManager stopped 23/02/06 02:12:55 INFO BlockManagerMaster: BlockManagerMaster stopped 23/02/06 02:12:55 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 23/02/06 02:12:55 INFO SparkContext: Successfully stopped SparkContext 23/02/06 02:12:56 INFO ShutdownHookManager: Shutdown hook called ``` However, in the complex case, there are many many logs after invoking `SparkContet.stop(0)`. Sometimes, this makes users confused. New log will show a clear boundary to ignore the meaningless messages. ``` 23/02/06 02:59:27 INFO TaskSetManager: Starting task 283.0 in stage 34.0 (TID 426) (172.31.218.234, executor 5, partition 283, PROCESS_LOCAL, 8001 bytes) ... 23/02/06 02:59:27 INFO BlockManagerInfo: Removed broadcast_35_piece0 on 172.31.218.244:41741 in memory (size: 5.7 KiB, free: 50.8 GiB) ... 23/02/06 02:59:27 INFO SparkUI: Stopped Spark web UI at http://r6i-16xlarge-3402-0203-apple-3-bf3f7e8624a90a37-driver-svc.default.svc:4040 ... 23/02/06 02:59:27 INFO DAGScheduler: ShuffleMapStage 34 (q24a) failed in 0.103 s due to Stage cancelled because SparkContext was shut down ... 23/02/06 02:59:27 INFO KubernetesClusterSchedulerBackend: Shutting down all executors ... 23/02/06 02:59:27 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down ... 23/02/06 02:59:27 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message. org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. ``` ### Does this PR introduce _any_ user-facing change? No, this is a log-only change. ### How was this patch tested? Manually.
Curently, daemon.py forks a pool of numProcessors subprocesses, and those processes fork themselves again to create the actual Python worker processes that handle data.
I think that this extra layer of indirection is unnecessary and adds a lot of complexity. This commit attempts to remove this middle layer of subprocesses by launching the workers directly from daemon.py.
See mesos/spark#563 for the original PR that added daemon.py, where I raise some issues with the current design.