From f9aabb755ce4cb89a6dd926bd26a08611cfaf389 Mon Sep 17 00:00:00 2001 From: "Erik M. Bray" Date: Fri, 15 Mar 2019 11:51:38 +0100 Subject: [PATCH 1/5] Trac #27490: Simplistic multiprocessing.Pool replacement for parallel docbuild on older Cygwin --- src/sage_setup/docbuild/__init__.py | 101 +++++++++++++++++++++------- 1 file changed, 75 insertions(+), 26 deletions(-) diff --git a/src/sage_setup/docbuild/__init__.py b/src/sage_setup/docbuild/__init__.py index 3bf7f4fc35b..15097e67f8f 100644 --- a/src/sage_setup/docbuild/__init__.py +++ b/src/sage_setup/docbuild/__init__.py @@ -43,6 +43,7 @@ import optparse, os, shutil, subprocess, sys, re import logging, warnings +import time logger = logging.getLogger(__name__) @@ -53,7 +54,7 @@ import sage.all from sage.misc.cachefunc import cached_method from sage.misc.misc import sage_makedirs -from sage.env import SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC +from sage.env import SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC, CYGWIN_VERSION from .build_options import (LANGUAGES, SPHINXOPTS, PAPER, OMIT, PAPEROPTS, ALLSPHINXOPTS, NUM_THREADS, WEBSITESPHINXOPTS, @@ -264,29 +265,79 @@ def clean(self, *args): # import the customized builder for object.inv files inventory = builder_helper('inventory') -def build_many(target, args): - # Pool() uses an actual fork() to run each new instance. This is important - # for performance reasons, i.e., don't use a forkserver when it becomes - # available with Python 3: Here, sage is already initialized which is quite - # costly, with a forkserver we would have to reinitialize it for every - # document we build. At the same time, don't serialize this by taking the - # pool (and thus the call to fork()) out completely: The call to Sphinx - # leaks memory, so we need to build each document in its own process to - # control the RAM usage. - from multiprocessing import Pool - pool = Pool(NUM_THREADS, maxtasksperchild=1) - # map_async handles KeyboardInterrupt correctly. Plain map and - # apply_async does not, so don't use it. - x = pool.map_async(target, args, 1) - try: - ret = x.get(99999) - pool.close() - pool.join() - except Exception: - pool.terminate() - if ABORT_ON_ERROR: - raise - return ret + +if not (CYGWIN_VERSION and CYGWIN_VERSION[0] < 3): + def build_many(target, args): + # Pool() uses an actual fork() to run each new instance. This is + # important for performance reasons, i.e., don't use a forkserver when + # it becomes available with Python 3: Here, sage is already initialized + # which is quite costly, with a forkserver we would have to + # reinitialize it for every document we build. At the same time, don't + # serialize this by taking the pool (and thus the call to fork()) out + # completely: The call to Sphinx leaks memory, so we need to build each + # document in its own process to control the RAM usage. + from multiprocessing import Pool + pool = Pool(NUM_THREADS, maxtasksperchild=1) + # map_async handles KeyboardInterrupt correctly. Plain map and + # apply_async does not, so don't use it. + x = pool.map_async(target, args, 1) + try: + ret = x.get(99999) + pool.close() + pool.join() + except Exception: + pool.terminate() + if ABORT_ON_ERROR: + raise + return ret +else: + # Cygwin 64-bit < 3.0.0 has a bug with exception handling when exceptions + # occur in pthreads, so it's dangerous to use multiprocessing.Pool, as + # signals can't be properly handled in worker processes, and they can crash + # causing the docbuild to hang. But where are these pthreads, you ask? + # Well, multiprocessing.Pool runs a thread from which it starts new worker + # processes when old workers complete/die, so the worker processes behave + # as though they were started from a pthread, even after fork(), and are + # actually succeptible to this bug. As a workaround, here's a naïve but + # good-enough "pool" replacement that does not use threads + # https://trac.sagemath.org/ticket/27214#comment:25 for further discussion. + def build_many(target, args): + from multiprocessing import Process + workers = [None] * NUM_THREADS + queue = list(args) + jobs = {} + + try: + while True: + for idx, w in enumerate(workers): + if w and w.exitcode is not None: + if w.exitcode != 0: + raise RuntimeError( + "worker for {} died with non-zero exit code " + "{}".format(jobs[w.pid], w.exitcode)) + + jobs.pop(w.pid) + w = None + + if w is None: + if queue: + job = queue.pop(0) + w = Process(target=target, args=(job,)) + w.start() + jobs[w.pid] = job + + workers[idx] = w + + if not any(filter(None, workers)): + break + + time.sleep(5) + finally: + for w in workers: + if w is not None: + w.terminate() + w.join() + ########################################## # Parallel Building Ref Manual # @@ -318,7 +369,6 @@ def _wrapper(self, name, *args, **kwds): This is the function which goes through all of the documents and does the actual building. """ - import time start = time.time() docs = self.get_all_documents() refs = [x for x in docs if x.endswith('reference')] @@ -811,7 +861,6 @@ def update_mtimes(self): """ env = self.get_sphinx_environment() if env is not None: - import time for doc in env.all_docs: env.all_docs[doc] = time.time() logger.info("Updated %d reST file mtimes", len(env.all_docs)) From 219e9c41e43036bed5d2a3f54d4684ee33aef06a Mon Sep 17 00:00:00 2001 From: "Erik M. Bray" Date: Fri, 15 Mar 2019 16:19:45 +0100 Subject: [PATCH 2/5] A little bit of import cleanup --- src/sage_setup/docbuild/__init__.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/sage_setup/docbuild/__init__.py b/src/sage_setup/docbuild/__init__.py index 15097e67f8f..988f4ff4d5d 100644 --- a/src/sage_setup/docbuild/__init__.py +++ b/src/sage_setup/docbuild/__init__.py @@ -41,9 +41,15 @@ from __future__ import absolute_import, print_function from six.moves import range -import optparse, os, shutil, subprocess, sys, re -import logging, warnings +import logging +import optparse +import os +import re +import shutil +import subprocess +import sys import time +import warnings logger = logging.getLogger(__name__) From 88771dfbe0d6b90cf5574065d312d1e0a7f005af Mon Sep 17 00:00:00 2001 From: "Erik M. Bray" Date: Fri, 15 Mar 2019 16:20:06 +0100 Subject: [PATCH 3/5] Trac #27490: Address some review comments and other cleanup: * Use os.wait() to block until completion of a worker process (any worker, or even some other arbitrary child process) rather than an arbitrary time.sleep loop * Abort the docbuild on error only if ABORT_ON_ERROR * Upon shutdown, terminate() each remaining worker first before join()ing them * Other minor code improvements and added more comments --- src/sage_setup/docbuild/__init__.py | 51 +++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/src/sage_setup/docbuild/__init__.py b/src/sage_setup/docbuild/__init__.py index 988f4ff4d5d..e05ac4b389e 100644 --- a/src/sage_setup/docbuild/__init__.py +++ b/src/sage_setup/docbuild/__init__.py @@ -41,6 +41,7 @@ from __future__ import absolute_import, print_function from six.moves import range +import errno import logging import optparse import os @@ -311,13 +312,23 @@ def build_many(target, args): from multiprocessing import Process workers = [None] * NUM_THREADS queue = list(args) + + # Maps worker process PIDs to the name of the document it's working + # on (the argument it was passed). This is primarily used just for + # debugging/information purposes. jobs = {} try: while True: + # Check the status of each worker for idx, w in enumerate(workers): + # If a worker process exited, check its exit code; if it + # exited non-zero and ABORT_ON_ERROR is True (the default) + # raise a RuntimeError to stop the docbuild process (we + # still give other workers a chance to finish cleanly in + # the finally: block below). if w and w.exitcode is not None: - if w.exitcode != 0: + if w.exitcode != 0 and ABORT_ON_ERROR: raise RuntimeError( "worker for {} died with non-zero exit code " "{}".format(jobs[w.pid], w.exitcode)) @@ -325,24 +336,38 @@ def build_many(target, args): jobs.pop(w.pid) w = None - if w is None: - if queue: - job = queue.pop(0) - w = Process(target=target, args=(job,)) - w.start() - jobs[w.pid] = job + # Worker w is dead/not started, so start a new worker + # in its place with the next document from the queue + if w is None and queue: + job = queue.pop(0) + w = Process(target=target, args=(job,)) + w.start() + jobs[w.pid] = job workers[idx] = w - if not any(filter(None, workers)): + if all(w is None for w in workers): + # If all workers are dead and there are no more items to + # process in the queue then we are done break - time.sleep(5) + # Wait for a worker to finish (either successfully or with + # error). We ignore the return value for now and check all + # workers at the beginning of the loop. + try: + os.wait() + except OSError as exc: + # Ignore ECHILD meaning no more child processes; i.e. all + # workers are already complete. + if exc.errno != errno.ECHILD: + raise finally: - for w in workers: - if w is not None: - w.terminate() - w.join() + remaining_workers = [w for w in workers if w is not None] + for w in remaining_workers: + # Give any remaining workers a chance to shut down gracefully + w.terminate() + for w in remaining_workers: + w.join() ########################################## From 1e5b1f56b37e1fb9abdc75c5e3540fc137c5024c Mon Sep 17 00:00:00 2001 From: "Erik M. Bray" Date: Fri, 15 Mar 2019 18:13:19 +0100 Subject: [PATCH 4/5] Trac #27490: Further fixes in use of os.wait() multiprocessing.Process objects don't behave well if we wait() manually rather than let them call os.waitpid() on themselves: Otherwise they can't get their exit codes. So we have to give them a little help here to tell them they're already dead. --- src/sage_setup/docbuild/__init__.py | 88 +++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 24 deletions(-) diff --git a/src/sage_setup/docbuild/__init__.py b/src/sage_setup/docbuild/__init__.py index e05ac4b389e..1fe2ff3f2bb 100644 --- a/src/sage_setup/docbuild/__init__.py +++ b/src/sage_setup/docbuild/__init__.py @@ -318,23 +318,63 @@ def build_many(target, args): # debugging/information purposes. jobs = {} + def bring_out_yer_dead(w, exitcode): + """ + Handle a dead / completed worker. Raises RuntimeError if it + returned with a non-zero exit code. + """ + + if w is None or exitcode is None: + # I'm not dead yet! (or I haven't even been born yet) + return w + + # Hack: If we wait()ed on this worker manually we have to tell it + # it's dead: + if w._popen.returncode is None: + w._popen.returncode = exitcode + + if exitcode != 0 and ABORT_ON_ERROR: + raise RuntimeError( + "worker for {} died with non-zero exit code " + "{}".format(jobs[w.pid], w.exitcode)) + + jobs.pop(w.pid) + # Helps multiprocessing with some internal bookkeeping + w.join() + + return None + + def wait_for_one(): + """Wait for a single process and return its pid and exit code.""" + try: + pid, sts = os.wait() + except OSError as exc: + # No more processes to wait on if ECHILD + if exc.errno != errno.ECHILD: + raise + else: + return None, None + + if os.WIFSIGNALED(sts): + exitcode = -os.WTERMSIG(sts) + else: + exitcode = os.WEXITSTATUS(sts) + + return pid, exitcode + + waited_pid = None + waited_exitcode = None try: while True: # Check the status of each worker for idx, w in enumerate(workers): - # If a worker process exited, check its exit code; if it - # exited non-zero and ABORT_ON_ERROR is True (the default) - # raise a RuntimeError to stop the docbuild process (we - # still give other workers a chance to finish cleanly in - # the finally: block below). - if w and w.exitcode is not None: - if w.exitcode != 0 and ABORT_ON_ERROR: - raise RuntimeError( - "worker for {} died with non-zero exit code " - "{}".format(jobs[w.pid], w.exitcode)) - - jobs.pop(w.pid) - w = None + if w is not None: + if w.pid == waited_pid: + exitcode = waited_exitcode + else: + exitcode = w.exitcode + + w = bring_out_yer_dead(w, exitcode) # Worker w is dead/not started, so start a new worker # in its place with the next document from the queue @@ -351,21 +391,21 @@ def build_many(target, args): # process in the queue then we are done break - # Wait for a worker to finish (either successfully or with - # error). We ignore the return value for now and check all - # workers at the beginning of the loop. - try: - os.wait() - except OSError as exc: - # Ignore ECHILD meaning no more child processes; i.e. all - # workers are already complete. - if exc.errno != errno.ECHILD: - raise + # We'll check each worker process against the returned + # pid back at the top of the `while True` loop. We also + # check any other processes that may have exited in the + # meantime + waited_pid, waited_exitcode = wait_for_one() finally: remaining_workers = [w for w in workers if w is not None] for w in remaining_workers: # Give any remaining workers a chance to shut down gracefully - w.terminate() + try: + w.terminate() + except OSError as exc: + if exc.errno != errno.ESRCH: + # Otherwise it was already dead so this was expected + raise for w in remaining_workers: w.join() From fe0e3ea1a8d85d41047f9308db48912f1bdcffcd Mon Sep 17 00:00:00 2001 From: "Erik M. Bray" Date: Tue, 19 Mar 2019 11:04:04 +0000 Subject: [PATCH 5/5] Trac #27490: Moved the alternate build_many implementation into a sage_setup.docbuild.utils module. Also refactored it a bit more, including adding specialized WorkerDiedException class in order to distinguish it better from some other RuntimeError. Added a couple doctest examples. --- src/sage_setup/docbuild/__init__.py | 102 +------------ src/sage_setup/docbuild/utils.py | 214 ++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+), 101 deletions(-) create mode 100644 src/sage_setup/docbuild/utils.py diff --git a/src/sage_setup/docbuild/__init__.py b/src/sage_setup/docbuild/__init__.py index 1fe2ff3f2bb..4546f87cb23 100644 --- a/src/sage_setup/docbuild/__init__.py +++ b/src/sage_setup/docbuild/__init__.py @@ -41,7 +41,6 @@ from __future__ import absolute_import, print_function from six.moves import range -import errno import logging import optparse import os @@ -308,106 +307,7 @@ def build_many(target, args): # actually succeptible to this bug. As a workaround, here's a naïve but # good-enough "pool" replacement that does not use threads # https://trac.sagemath.org/ticket/27214#comment:25 for further discussion. - def build_many(target, args): - from multiprocessing import Process - workers = [None] * NUM_THREADS - queue = list(args) - - # Maps worker process PIDs to the name of the document it's working - # on (the argument it was passed). This is primarily used just for - # debugging/information purposes. - jobs = {} - - def bring_out_yer_dead(w, exitcode): - """ - Handle a dead / completed worker. Raises RuntimeError if it - returned with a non-zero exit code. - """ - - if w is None or exitcode is None: - # I'm not dead yet! (or I haven't even been born yet) - return w - - # Hack: If we wait()ed on this worker manually we have to tell it - # it's dead: - if w._popen.returncode is None: - w._popen.returncode = exitcode - - if exitcode != 0 and ABORT_ON_ERROR: - raise RuntimeError( - "worker for {} died with non-zero exit code " - "{}".format(jobs[w.pid], w.exitcode)) - - jobs.pop(w.pid) - # Helps multiprocessing with some internal bookkeeping - w.join() - - return None - - def wait_for_one(): - """Wait for a single process and return its pid and exit code.""" - try: - pid, sts = os.wait() - except OSError as exc: - # No more processes to wait on if ECHILD - if exc.errno != errno.ECHILD: - raise - else: - return None, None - - if os.WIFSIGNALED(sts): - exitcode = -os.WTERMSIG(sts) - else: - exitcode = os.WEXITSTATUS(sts) - - return pid, exitcode - - waited_pid = None - waited_exitcode = None - try: - while True: - # Check the status of each worker - for idx, w in enumerate(workers): - if w is not None: - if w.pid == waited_pid: - exitcode = waited_exitcode - else: - exitcode = w.exitcode - - w = bring_out_yer_dead(w, exitcode) - - # Worker w is dead/not started, so start a new worker - # in its place with the next document from the queue - if w is None and queue: - job = queue.pop(0) - w = Process(target=target, args=(job,)) - w.start() - jobs[w.pid] = job - - workers[idx] = w - - if all(w is None for w in workers): - # If all workers are dead and there are no more items to - # process in the queue then we are done - break - - # We'll check each worker process against the returned - # pid back at the top of the `while True` loop. We also - # check any other processes that may have exited in the - # meantime - waited_pid, waited_exitcode = wait_for_one() - finally: - remaining_workers = [w for w in workers if w is not None] - for w in remaining_workers: - # Give any remaining workers a chance to shut down gracefully - try: - w.terminate() - except OSError as exc: - if exc.errno != errno.ESRCH: - # Otherwise it was already dead so this was expected - raise - for w in remaining_workers: - w.join() + from .utils import _build_many as build_many ########################################## diff --git a/src/sage_setup/docbuild/utils.py b/src/sage_setup/docbuild/utils.py new file mode 100644 index 00000000000..3f7142dc3db --- /dev/null +++ b/src/sage_setup/docbuild/utils.py @@ -0,0 +1,214 @@ +"""Miscellaneous utilities for running the docbuilder.""" + +import errno +import os + + +class WorkerDiedException(RuntimeError): + """Raised if a worker process dies unexpected.""" + + +def _build_many(target, args, processes=None): + """ + Map a list of arguments in ``args`` to a single-argument target function + ``target`` in parallel using ``NUM_THREADS`` (or ``processes`` if given) + simultaneous processes. + + This is a simplified version of ``multiprocessing.Pool.map`` from the + Python standard library which avoids a couple of its pitfalls. In + particular, it can abort (with a `RuntimeError`) without hanging if one of + the worker processes unexpectedly dies. It also avoids starting new + processes from a pthread, which is known to result in bugs on versions of + Cygwin prior to 3.0.0 (see + https://trac.sagemath.org/ticket/27214#comment:25). + + On the other hand, unlike ``multiprocessing.Pool.map`` it does not return + a result. This is fine for the purpose of building multiple Sphinx + documents in parallel. + + In the future this may be replaced by a generalized version of the more + robust parallel processing implementation from ``sage.doctest.forker``. + + EXAMPLES:: + + sage: from sage_setup.docbuild.utils import _build_many + sage: def target(N): + ....: import time + ....: time.sleep(float(0.1)) + ....: print('Processed task %s' % N) + ....: + sage: _build_many(target, range(8), processes=8) + Processed task ... + Processed task ... + Processed task ... + Processed task ... + Processed task ... + Processed task ... + Processed task ... + Processed task ... + + If one of the worker processes errors out from an unhandled exception, or + otherwise exits non-zero (e.g. killed by a signal) any in-progress tasks + will be completed gracefully, but then a `RuntimeError` is raised and + pending tasks are not started:: + + sage: def target(N): + ....: import time + ....: if N == 4: + ....: # Task 4 is a poison pill + ....: 1 / 0 + ....: else: + ....: time.sleep(0.5) + ....: print('Processed task %s' % N) + ....: + + Note: In practice this test might still show output from the other worker + processes before the poison-pill is executed. It may also display the + traceback from the failing process on stderr. However, due to how the + doctest runner works, the doctest will only expect the final exception:: + + sage: _build_many(target, range(8), processes=8) + Traceback (most recent call last): + ... + WorkerDiedException: worker for 4 died with non-zero exit code 1 + """ + from multiprocessing import Process + from .build_options import NUM_THREADS, ABORT_ON_ERROR + + if processes is None: + processes = NUM_THREADS + + workers = [None] * processes + queue = list(args) + + # Maps worker process PIDs to the name of the document it's working + # on (the argument it was passed). This is primarily used just for + # debugging/information purposes. + jobs = {} + + ### Utility functions ### + + def bring_out_yer_dead(w, exitcode): + """ + Handle a dead / completed worker. Raises WorkerDiedError if it + returned with a non-zero exit code. + """ + + if w is None or exitcode is None: + # I'm not dead yet! (or I haven't even been born yet) + return w + + # Hack: If we wait()ed on this worker manually we have to tell it + # it's dead: + if w._popen.returncode is None: + w._popen.returncode = exitcode + + if exitcode != 0 and ABORT_ON_ERROR: + raise WorkerDiedException( + "worker for {} died with non-zero exit code " + "{}".format(jobs[w.pid], w.exitcode)) + + jobs.pop(w.pid) + # Helps multiprocessing with some internal bookkeeping + w.join() + + return None + + def wait_for_one(): + """Wait for a single process and return its pid and exit code.""" + try: + pid, sts = os.wait() + except OSError as exc: + # No more processes to wait on if ECHILD + if exc.errno != errno.ECHILD: + raise + else: + return None, None + + if os.WIFSIGNALED(sts): + exitcode = -os.WTERMSIG(sts) + else: + exitcode = os.WEXITSTATUS(sts) + + return pid, exitcode + + def reap_workers(waited_pid=None, waited_exitcode=None): + """ + This is the main worker handling loop. + + Checks if workers have completed their tasks and spawns new workers if + there are more tasks on the queue. Returns `False` if there is more + work to be done or `True` if the work is complete. + + Raises a ``WorkerDiedException`` if a worker exits unexpectedly. + """ + + all_done = True + + for idx, w in enumerate(workers): + if w is not None: + if w.pid == waited_pid: + exitcode = waited_exitcode + else: + exitcode = w.exitcode + + w = bring_out_yer_dead(w, exitcode) + + # Worker w is dead/not started, so start a new worker + # in its place with the next document from the queue + if w is None and queue: + job = queue.pop(0) + w = Process(target=target, args=(job,)) + w.start() + jobs[w.pid] = job + + workers[idx] = w + + if w is not None: + all_done = False + + # If all workers are dead and there are no more items to + # process in the queue then we are done + return all_done + + ### Main loop ### + + waited_pid = None # Set along with waited_exitcode by calls to + # wait_for_one() + waited_exitcode = None + worker_exc = None # Set to a WorkerDiedException if one occurs + + try: + while True: + # Check the status of each worker and break out of the loop if + # all work is done. + # We'll check each worker process against the returned + # pid back at the top of the `while True` loop. We also + # check any other processes that may have exited in the + # meantime + try: + if reap_workers(waited_pid, waited_exitcode): + break + except WorkerDiedException as exc: + worker_exc = exc + break + + waited_pid, waited_exitcode = wait_for_one() + finally: + try: + remaining_workers = [w for w in workers if w is not None] + for w in remaining_workers: + # Give any remaining workers a chance to shut down gracefully + try: + w.terminate() + except OSError as exc: + if exc.errno != errno.ESRCH: + # Otherwise it was already dead so this was expected + raise + for w in remaining_workers: + w.join() + finally: + if worker_exc is not None: + # Re-raise the RuntimeError from bring_out_yer_dead set if a + # worker died unexpectedly + raise worker_exc