Skip to content

Commit

Permalink
Trac #27490: Simplistic multiprocessing.Pool replacement for parallel…
Browse files Browse the repository at this point in the history
… docbuild on older Cygwin

For Cygwin versions less than 3.0.0 (and only Cygwin) this replaces the
use of `multiprocessing.Pool` in the `sage_setup.docbuild.build_many`
function, with a naïve but "good enough" (it works in general) parallel
process pool that does not rely on starting processes from threads.

This is needed for #27214, because the specific combination of using
`MAP_NORESERVE` `mmap`s and forking processes from a thread can result
in a bug in Cygwin (fixed in 3.0.0) which causes unhandled segfaults to
occur in any code that is run during the docbuild which uses libgap.

So this is really only needed so that the docs can continue to be built
on systems (including my primary development environment, as well as the
buildbot) that do not yet have Cygwin >= 3.0.0 once #27214 is applied.

URL: https://trac.sagemath.org/27490
Reported by: embray
Ticket author(s): Erik Bray
Reviewer(s): Jeroen Demeyer
  • Loading branch information
Release Manager authored and vbraun committed Mar 19, 2019
2 parents b543728 + fe0e3ea commit 593f669
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 28 deletions.
76 changes: 48 additions & 28 deletions src/sage_setup/docbuild/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,15 @@
from __future__ import absolute_import, print_function
from six.moves import range

import optparse, os, shutil, subprocess, sys, re
import logging, warnings
import logging
import optparse
import os
import re
import shutil
import subprocess
import sys
import time
import warnings

logger = logging.getLogger(__name__)

Expand All @@ -53,7 +60,7 @@
import sage.all
from sage.misc.cachefunc import cached_method
from sage.misc.misc import sage_makedirs
from sage.env import SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC
from sage.env import SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC, CYGWIN_VERSION

from .build_options import (LANGUAGES, SPHINXOPTS, PAPER, OMIT,
PAPEROPTS, ALLSPHINXOPTS, NUM_THREADS, WEBSITESPHINXOPTS,
Expand Down Expand Up @@ -264,29 +271,44 @@ def clean(self, *args):
# import the customized builder for object.inv files
inventory = builder_helper('inventory')

def build_many(target, args):
# Pool() uses an actual fork() to run each new instance. This is important
# for performance reasons, i.e., don't use a forkserver when it becomes
# available with Python 3: Here, sage is already initialized which is quite
# costly, with a forkserver we would have to reinitialize it for every
# document we build. At the same time, don't serialize this by taking the
# pool (and thus the call to fork()) out completely: The call to Sphinx
# leaks memory, so we need to build each document in its own process to
# control the RAM usage.
from multiprocessing import Pool
pool = Pool(NUM_THREADS, maxtasksperchild=1)
# map_async handles KeyboardInterrupt correctly. Plain map and
# apply_async does not, so don't use it.
x = pool.map_async(target, args, 1)
try:
ret = x.get(99999)
pool.close()
pool.join()
except Exception:
pool.terminate()
if ABORT_ON_ERROR:
raise
return ret

if not (CYGWIN_VERSION and CYGWIN_VERSION[0] < 3):
def build_many(target, args):
# Pool() uses an actual fork() to run each new instance. This is
# important for performance reasons, i.e., don't use a forkserver when
# it becomes available with Python 3: Here, sage is already initialized
# which is quite costly, with a forkserver we would have to
# reinitialize it for every document we build. At the same time, don't
# serialize this by taking the pool (and thus the call to fork()) out
# completely: The call to Sphinx leaks memory, so we need to build each
# document in its own process to control the RAM usage.
from multiprocessing import Pool
pool = Pool(NUM_THREADS, maxtasksperchild=1)
# map_async handles KeyboardInterrupt correctly. Plain map and
# apply_async does not, so don't use it.
x = pool.map_async(target, args, 1)
try:
ret = x.get(99999)
pool.close()
pool.join()
except Exception:
pool.terminate()
if ABORT_ON_ERROR:
raise
return ret
else:
# Cygwin 64-bit < 3.0.0 has a bug with exception handling when exceptions
# occur in pthreads, so it's dangerous to use multiprocessing.Pool, as
# signals can't be properly handled in worker processes, and they can crash
# causing the docbuild to hang. But where are these pthreads, you ask?
# Well, multiprocessing.Pool runs a thread from which it starts new worker
# processes when old workers complete/die, so the worker processes behave
# as though they were started from a pthread, even after fork(), and are
# actually succeptible to this bug. As a workaround, here's a naïve but
# good-enough "pool" replacement that does not use threads
# https://trac.sagemath.org/ticket/27214#comment:25 for further discussion.
from .utils import _build_many as build_many


##########################################
# Parallel Building Ref Manual #
Expand Down Expand Up @@ -318,7 +340,6 @@ def _wrapper(self, name, *args, **kwds):
This is the function which goes through all of the documents
and does the actual building.
"""
import time
start = time.time()
docs = self.get_all_documents()
refs = [x for x in docs if x.endswith('reference')]
Expand Down Expand Up @@ -811,7 +832,6 @@ def update_mtimes(self):
"""
env = self.get_sphinx_environment()
if env is not None:
import time
for doc in env.all_docs:
env.all_docs[doc] = time.time()
logger.info("Updated %d reST file mtimes", len(env.all_docs))
Expand Down
214 changes: 214 additions & 0 deletions src/sage_setup/docbuild/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
"""Miscellaneous utilities for running the docbuilder."""

import errno
import os


class WorkerDiedException(RuntimeError):
"""Raised if a worker process dies unexpected."""


def _build_many(target, args, processes=None):
"""
Map a list of arguments in ``args`` to a single-argument target function
``target`` in parallel using ``NUM_THREADS`` (or ``processes`` if given)
simultaneous processes.
This is a simplified version of ``multiprocessing.Pool.map`` from the
Python standard library which avoids a couple of its pitfalls. In
particular, it can abort (with a `RuntimeError`) without hanging if one of
the worker processes unexpectedly dies. It also avoids starting new
processes from a pthread, which is known to result in bugs on versions of
Cygwin prior to 3.0.0 (see
https://trac.sagemath.org/ticket/27214#comment:25).
On the other hand, unlike ``multiprocessing.Pool.map`` it does not return
a result. This is fine for the purpose of building multiple Sphinx
documents in parallel.
In the future this may be replaced by a generalized version of the more
robust parallel processing implementation from ``sage.doctest.forker``.
EXAMPLES::
sage: from sage_setup.docbuild.utils import _build_many
sage: def target(N):
....: import time
....: time.sleep(float(0.1))
....: print('Processed task %s' % N)
....:
sage: _build_many(target, range(8), processes=8)
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
Processed task ...
If one of the worker processes errors out from an unhandled exception, or
otherwise exits non-zero (e.g. killed by a signal) any in-progress tasks
will be completed gracefully, but then a `RuntimeError` is raised and
pending tasks are not started::
sage: def target(N):
....: import time
....: if N == 4:
....: # Task 4 is a poison pill
....: 1 / 0
....: else:
....: time.sleep(0.5)
....: print('Processed task %s' % N)
....:
Note: In practice this test might still show output from the other worker
processes before the poison-pill is executed. It may also display the
traceback from the failing process on stderr. However, due to how the
doctest runner works, the doctest will only expect the final exception::
sage: _build_many(target, range(8), processes=8)
Traceback (most recent call last):
...
WorkerDiedException: worker for 4 died with non-zero exit code 1
"""
from multiprocessing import Process
from .build_options import NUM_THREADS, ABORT_ON_ERROR

if processes is None:
processes = NUM_THREADS

workers = [None] * processes
queue = list(args)

# Maps worker process PIDs to the name of the document it's working
# on (the argument it was passed). This is primarily used just for
# debugging/information purposes.
jobs = {}

### Utility functions ###

def bring_out_yer_dead(w, exitcode):
"""
Handle a dead / completed worker. Raises WorkerDiedError if it
returned with a non-zero exit code.
"""

if w is None or exitcode is None:
# I'm not dead yet! (or I haven't even been born yet)
return w

# Hack: If we wait()ed on this worker manually we have to tell it
# it's dead:
if w._popen.returncode is None:
w._popen.returncode = exitcode

if exitcode != 0 and ABORT_ON_ERROR:
raise WorkerDiedException(
"worker for {} died with non-zero exit code "
"{}".format(jobs[w.pid], w.exitcode))

jobs.pop(w.pid)
# Helps multiprocessing with some internal bookkeeping
w.join()

return None

def wait_for_one():
"""Wait for a single process and return its pid and exit code."""
try:
pid, sts = os.wait()
except OSError as exc:
# No more processes to wait on if ECHILD
if exc.errno != errno.ECHILD:
raise
else:
return None, None

if os.WIFSIGNALED(sts):
exitcode = -os.WTERMSIG(sts)
else:
exitcode = os.WEXITSTATUS(sts)

return pid, exitcode

def reap_workers(waited_pid=None, waited_exitcode=None):
"""
This is the main worker handling loop.
Checks if workers have completed their tasks and spawns new workers if
there are more tasks on the queue. Returns `False` if there is more
work to be done or `True` if the work is complete.
Raises a ``WorkerDiedException`` if a worker exits unexpectedly.
"""

all_done = True

for idx, w in enumerate(workers):
if w is not None:
if w.pid == waited_pid:
exitcode = waited_exitcode
else:
exitcode = w.exitcode

w = bring_out_yer_dead(w, exitcode)

# Worker w is dead/not started, so start a new worker
# in its place with the next document from the queue
if w is None and queue:
job = queue.pop(0)
w = Process(target=target, args=(job,))
w.start()
jobs[w.pid] = job

workers[idx] = w

if w is not None:
all_done = False

# If all workers are dead and there are no more items to
# process in the queue then we are done
return all_done

### Main loop ###

waited_pid = None # Set along with waited_exitcode by calls to
# wait_for_one()
waited_exitcode = None
worker_exc = None # Set to a WorkerDiedException if one occurs

try:
while True:
# Check the status of each worker and break out of the loop if
# all work is done.
# We'll check each worker process against the returned
# pid back at the top of the `while True` loop. We also
# check any other processes that may have exited in the
# meantime
try:
if reap_workers(waited_pid, waited_exitcode):
break
except WorkerDiedException as exc:
worker_exc = exc
break

waited_pid, waited_exitcode = wait_for_one()
finally:
try:
remaining_workers = [w for w in workers if w is not None]
for w in remaining_workers:
# Give any remaining workers a chance to shut down gracefully
try:
w.terminate()
except OSError as exc:
if exc.errno != errno.ESRCH:
# Otherwise it was already dead so this was expected
raise
for w in remaining_workers:
w.join()
finally:
if worker_exc is not None:
# Re-raise the RuntimeError from bring_out_yer_dead set if a
# worker died unexpectedly
raise worker_exc

0 comments on commit 593f669

Please sign in to comment.