diff --git a/src/pip/_internal/utils/parallel.py b/src/pip/_internal/utils/parallel.py index 7e60d300b88..9fe1fe8b9e4 100644 --- a/src/pip/_internal/utils/parallel.py +++ b/src/pip/_internal/utils/parallel.py @@ -1,12 +1,22 @@ """Convenient parallelization of higher order functions. -This module provides proper fallback functions for multiprocess -and multithread map, both the non-lazy, ordered variant -and the lazy, unordered variant. +This module provides two helper functions, with appropriate fallbacks on +Python 2 and on systems lacking support for synchronization mechanisms: + +- map_multiprocess +- map_multithread + +These helpers work like Python 3's map, with two differences: + +- They don't guarantee the order of processing of + the elements of the iterable. +- The underlying process/thread pools chop the iterable into + a number of chunks, so that for very long iterables using + a large value for chunksize can make the job complete much faster + than using the default value of 1. """ -__all__ = ['map_multiprocess', 'imap_multiprocess', - 'map_multithread', 'imap_multithread'] +__all__ = ['map_multiprocess', 'map_multithread'] from contextlib import contextmanager from multiprocessing import Pool as ProcessPool @@ -19,8 +29,7 @@ from pip._internal.utils.typing import MYPY_CHECK_RUNNING if MYPY_CHECK_RUNNING: - from typing import ( - Callable, Iterable, Iterator, List, Optional, Union, TypeVar) + from typing import Callable, Iterable, Iterator, Union, TypeVar from multiprocessing import pool Pool = Union[pool.Pool, pool.ThreadPool] @@ -43,87 +52,29 @@ @contextmanager def closing(pool): # type: (Pool) -> Iterator[Pool] - """Return a context manager that closes and joins pool. - - This is needed for Pool.imap* to make the result iterator iterate. - """ + """Return a context manager making sure the pool closes properly.""" try: yield pool finally: + # For Pool.imap*, close and join are needed + # for the returned iterator to begin yielding. pool.close() pool.join() + pool.terminate() -def _map_fallback(func, iterable, chunksize=None): - # type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T] - """Return a list of func applied to each element in iterable. - - This function is the sequential fallback when sem_open is unavailable. - """ - return list(map(func, iterable)) - - -def _imap_fallback(func, iterable, chunksize=1): +def _map_fallback(func, iterable, chunksize=1): # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] """Make an iterator applying func to each element in iterable. - This function is the sequential fallback when sem_open is unavailable. + This function is the sequential fallback either on Python 2 + where Pool.imap* doesn't react to KeyboardInterrupt + or when sem_open is unavailable. """ return map(func, iterable) -def _map_multiprocess_py2(func, iterable, chunksize=None): - # type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T] - """Chop iterable into chunks and submit them to a process pool. - - The (approximate) size of these chunks can be specified - by setting chunksize to a positive integer. - - Note that this function may cause high memory usage - for long iterables. - - Return a list of results in order. - """ - pool = ProcessPool() - try: - return pool.map_async(func, iterable, chunksize).get(TIMEOUT) - finally: - pool.terminate() - - -def _map_multiprocess_py3(func, iterable, chunksize=None): - # type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T] - """Chop iterable into chunks and submit them to a process pool. - - The (approximate) size of these chunks can be specified - by setting chunksize to a positive integer. - - Note that this function may cause high memory usage - for long iterables. - - Return a list of results in order. - """ - with ProcessPool() as pool: - return pool.map(func, iterable, chunksize) - - -def _imap_multiprocess_py2(func, iterable, chunksize=1): - # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] - """Chop iterable into chunks and submit them to a process pool. - - For very long iterables using a large value for chunksize can make - the job complete much faster than using the default value of 1. - - Return an unordered iterator of the results. - """ - pool = ProcessPool() - try: - return iter(pool.map_async(func, iterable, chunksize).get(TIMEOUT)) - finally: - pool.terminate() - - -def _imap_multiprocess_py3(func, iterable, chunksize=1): +def _map_multiprocess(func, iterable, chunksize=1): # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] """Chop iterable into chunks and submit them to a process pool. @@ -132,62 +83,11 @@ def _imap_multiprocess_py3(func, iterable, chunksize=1): Return an unordered iterator of the results. """ - with ProcessPool() as pool, closing(pool): + with closing(ProcessPool()) as pool: return pool.imap_unordered(func, iterable, chunksize) -def _map_multithread_py2(func, iterable, chunksize=None): - # type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T] - """Chop iterable into chunks and submit them to a thread pool. - - The (approximate) size of these chunks can be specified - by setting chunksize to a positive integer. - - Note that this function may cause high memory usage - for long iterables. - - Return a list of results in order. - """ - pool = ThreadPool(DEFAULT_POOLSIZE) - try: - return pool.map_async(func, iterable, chunksize).get(TIMEOUT) - finally: - pool.terminate() - - -def _map_multithread_py3(func, iterable, chunksize=None): - # type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T] - """Chop iterable into chunks and submit them to a thread pool. - - The (approximate) size of these chunks can be specified - by setting chunksize to a positive integer. - - Note that this function may cause high memory usage - for long iterables. - - Return a list of results in order. - """ - with ThreadPool(DEFAULT_POOLSIZE) as pool: - return pool.map(func, iterable, chunksize) - - -def _imap_multithread_py2(func, iterable, chunksize=1): - # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] - """Chop iterable into chunks and submit them to a thread pool. - - For very long iterables using a large value for chunksize can make - the job complete much faster than using the default value of 1. - - Return an unordered iterator of the results. - """ - pool = ThreadPool(DEFAULT_POOLSIZE) - try: - return pool.map_async(func, iterable, chunksize).get(TIMEOUT) - finally: - pool.terminate() - - -def _imap_multithread_py3(func, iterable, chunksize=1): +def _map_multithread(func, iterable, chunksize=1): # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] """Chop iterable into chunks and submit them to a thread pool. @@ -196,20 +96,12 @@ def _imap_multithread_py3(func, iterable, chunksize=1): Return an unordered iterator of the results. """ - with ThreadPool(DEFAULT_POOLSIZE) as pool, closing(pool): + with closing(ThreadPool(DEFAULT_POOLSIZE)) as pool: return pool.imap_unordered(func, iterable, chunksize) -if LACK_SEM_OPEN: +if LACK_SEM_OPEN or PY2: map_multiprocess = map_multithread = _map_fallback - imap_multiprocess = imap_multithread = _imap_fallback -elif PY2: - map_multiprocess = _map_multiprocess_py2 - imap_multiprocess = _imap_multiprocess_py2 - map_multithread = _map_multithread_py2 - imap_multithread = _imap_multithread_py2 else: - map_multiprocess = _map_multiprocess_py3 - imap_multiprocess = _imap_multiprocess_py3 - map_multithread = _map_multithread_py3 - imap_multithread = _imap_multithread_py3 + map_multiprocess = _map_multiprocess + map_multithread = _map_multithread diff --git a/tests/unit/test_utils_parallel.py b/tests/unit/test_utils_parallel.py index 0afc824bcae..bf42f6bd9e4 100644 --- a/tests/unit/test_utils_parallel.py +++ b/tests/unit/test_utils_parallel.py @@ -10,16 +10,16 @@ DUNDER_IMPORT = '__builtin__.__import__' if PY2 else 'builtins.__import__' FUNC, ITERABLE = factorial, range(42) -MAPS = ('map_multiprocess', 'imap_multiprocess', - 'map_multithread', 'imap_multithread') +MAPS = 'map_multiprocess', 'map_multithread' _import = __import__ def reload_parallel(): try: del modules['pip._internal.utils.parallel'] - finally: - return import_module('pip._internal.utils.parallel') + except KeyError: + pass + return import_module('pip._internal.utils.parallel') def lack_sem_open(name, *args, **kwargs): @@ -31,6 +31,8 @@ def lack_sem_open(name, *args, **kwargs): def have_sem_open(name, *args, **kwargs): """Make sure multiprocessing.synchronize import is successful.""" + # We don't care about the return value + # since we don't use the pool with this import. if name.endswith('synchronize'): return return _import(name, *args, **kwargs) @@ -45,8 +47,7 @@ def test_lack_sem_open(name, monkeypatch): """ monkeypatch.setattr(DUNDER_IMPORT, lack_sem_open) parallel = reload_parallel() - fallback = '_{}_fallback'.format(name.split('_')[0]) - assert getattr(parallel, name) is getattr(parallel, fallback) + assert getattr(parallel, name) is parallel._map_fallback @mark.parametrize('name', MAPS) @@ -54,7 +55,7 @@ def test_have_sem_open(name, monkeypatch): """Test fallback when sem_open is available.""" monkeypatch.setattr(DUNDER_IMPORT, have_sem_open) parallel = reload_parallel() - impl = ('_{}_py2' if PY2 else '_{}_py3').format(name) + impl = '_map_fallback' if PY2 else '_{}'.format(name) assert getattr(parallel, name) is getattr(parallel, impl) @@ -63,10 +64,3 @@ def test_map(name): """Test correctness of result of asynchronous maps.""" map_async = getattr(reload_parallel(), name) assert set(map_async(FUNC, ITERABLE)) == set(map(FUNC, ITERABLE)) - - -@mark.parametrize('name', ('map_multiprocess', 'map_multithread')) -def test_map_order(name): - """Test result ordering of asynchronous maps.""" - map_async = getattr(reload_parallel(), name) - assert tuple(map_async(FUNC, ITERABLE)) == tuple(map(FUNC, ITERABLE))