Skip to content

Commit

Permalink
Drop parallel map for Python 2 and the non-lazy variant
Browse files Browse the repository at this point in the history
Co-authored-by: Pradyun Gedam <pradyunsg@gmail.com>
  • Loading branch information
McSinyx and pradyunsg committed Jun 22, 2020
1 parent 61d4a52 commit ec6e31e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 153 deletions.
170 changes: 31 additions & 139 deletions src/pip/_internal/utils/parallel.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
"""Convenient parallelization of higher order functions.
This module provides proper fallback functions for multiprocess
and multithread map, both the non-lazy, ordered variant
and the lazy, unordered variant.
This module provides two helper functions, with appropriate fallbacks on
Python 2 and on systems lacking support for synchronization mechanisms:
- map_multiprocess
- map_multithread
These helpers work like Python 3's map, with two differences:
- They don't guarantee the order of processing of
the elements of the iterable.
- The underlying process/thread pools chop the iterable into
a number of chunks, so that for very long iterables using
a large value for chunksize can make the job complete much faster
than using the default value of 1.
"""

__all__ = ['map_multiprocess', 'imap_multiprocess',
'map_multithread', 'imap_multithread']
__all__ = ['map_multiprocess', 'map_multithread']

from contextlib import contextmanager
from multiprocessing import Pool as ProcessPool
Expand All @@ -19,8 +29,7 @@
from pip._internal.utils.typing import MYPY_CHECK_RUNNING

if MYPY_CHECK_RUNNING:
from typing import (
Callable, Iterable, Iterator, List, Optional, Union, TypeVar)
from typing import Callable, Iterable, Iterator, Union, TypeVar
from multiprocessing import pool

Pool = Union[pool.Pool, pool.ThreadPool]
Expand All @@ -43,87 +52,29 @@
@contextmanager
def closing(pool):
# type: (Pool) -> Iterator[Pool]
"""Return a context manager that closes and joins pool.
This is needed for Pool.imap* to make the result iterator iterate.
"""
"""Return a context manager making sure the pool closes properly."""
try:
yield pool
finally:
# For Pool.imap*, close and join are needed
# for the returned iterator to begin yielding.
pool.close()
pool.join()
pool.terminate()


def _map_fallback(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Return a list of func applied to each element in iterable.
This function is the sequential fallback when sem_open is unavailable.
"""
return list(map(func, iterable))


def _imap_fallback(func, iterable, chunksize=1):
def _map_fallback(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Make an iterator applying func to each element in iterable.
This function is the sequential fallback when sem_open is unavailable.
This function is the sequential fallback either on Python 2
where Pool.imap* doesn't react to KeyboardInterrupt
or when sem_open is unavailable.
"""
return map(func, iterable)


def _map_multiprocess_py2(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a process pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Note that this function may cause high memory usage
for long iterables.
Return a list of results in order.
"""
pool = ProcessPool()
try:
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
finally:
pool.terminate()


def _map_multiprocess_py3(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a process pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Note that this function may cause high memory usage
for long iterables.
Return a list of results in order.
"""
with ProcessPool() as pool:
return pool.map(func, iterable, chunksize)


def _imap_multiprocess_py2(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
pool = ProcessPool()
try:
return iter(pool.map_async(func, iterable, chunksize).get(TIMEOUT))
finally:
pool.terminate()


def _imap_multiprocess_py3(func, iterable, chunksize=1):
def _map_multiprocess(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.
Expand All @@ -132,62 +83,11 @@ def _imap_multiprocess_py3(func, iterable, chunksize=1):
Return an unordered iterator of the results.
"""
with ProcessPool() as pool, closing(pool):
with closing(ProcessPool()) as pool:
return pool.imap_unordered(func, iterable, chunksize)


def _map_multithread_py2(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a thread pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Note that this function may cause high memory usage
for long iterables.
Return a list of results in order.
"""
pool = ThreadPool(DEFAULT_POOLSIZE)
try:
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
finally:
pool.terminate()


def _map_multithread_py3(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a thread pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Note that this function may cause high memory usage
for long iterables.
Return a list of results in order.
"""
with ThreadPool(DEFAULT_POOLSIZE) as pool:
return pool.map(func, iterable, chunksize)


def _imap_multithread_py2(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
pool = ThreadPool(DEFAULT_POOLSIZE)
try:
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
finally:
pool.terminate()


def _imap_multithread_py3(func, iterable, chunksize=1):
def _map_multithread(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.
Expand All @@ -196,20 +96,12 @@ def _imap_multithread_py3(func, iterable, chunksize=1):
Return an unordered iterator of the results.
"""
with ThreadPool(DEFAULT_POOLSIZE) as pool, closing(pool):
with closing(ThreadPool(DEFAULT_POOLSIZE)) as pool:
return pool.imap_unordered(func, iterable, chunksize)


if LACK_SEM_OPEN:
if LACK_SEM_OPEN or PY2:
map_multiprocess = map_multithread = _map_fallback
imap_multiprocess = imap_multithread = _imap_fallback
elif PY2:
map_multiprocess = _map_multiprocess_py2
imap_multiprocess = _imap_multiprocess_py2
map_multithread = _map_multithread_py2
imap_multithread = _imap_multithread_py2
else:
map_multiprocess = _map_multiprocess_py3
imap_multiprocess = _imap_multiprocess_py3
map_multithread = _map_multithread_py3
imap_multithread = _imap_multithread_py3
map_multiprocess = _map_multiprocess
map_multithread = _map_multithread
22 changes: 8 additions & 14 deletions tests/unit/test_utils_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@

DUNDER_IMPORT = '__builtin__.__import__' if PY2 else 'builtins.__import__'
FUNC, ITERABLE = factorial, range(42)
MAPS = ('map_multiprocess', 'imap_multiprocess',
'map_multithread', 'imap_multithread')
MAPS = 'map_multiprocess', 'map_multithread'
_import = __import__


def reload_parallel():
try:
del modules['pip._internal.utils.parallel']
finally:
return import_module('pip._internal.utils.parallel')
except KeyError:
pass
return import_module('pip._internal.utils.parallel')


def lack_sem_open(name, *args, **kwargs):
Expand All @@ -31,6 +31,8 @@ def lack_sem_open(name, *args, **kwargs):

def have_sem_open(name, *args, **kwargs):
"""Make sure multiprocessing.synchronize import is successful."""
# We don't care about the return value
# since we don't use the pool with this import.
if name.endswith('synchronize'):
return
return _import(name, *args, **kwargs)
Expand All @@ -45,16 +47,15 @@ def test_lack_sem_open(name, monkeypatch):
"""
monkeypatch.setattr(DUNDER_IMPORT, lack_sem_open)
parallel = reload_parallel()
fallback = '_{}_fallback'.format(name.split('_')[0])
assert getattr(parallel, name) is getattr(parallel, fallback)
assert getattr(parallel, name) is parallel._map_fallback


@mark.parametrize('name', MAPS)
def test_have_sem_open(name, monkeypatch):
"""Test fallback when sem_open is available."""
monkeypatch.setattr(DUNDER_IMPORT, have_sem_open)
parallel = reload_parallel()
impl = ('_{}_py2' if PY2 else '_{}_py3').format(name)
impl = '_map_fallback' if PY2 else '_{}'.format(name)
assert getattr(parallel, name) is getattr(parallel, impl)


Expand All @@ -63,10 +64,3 @@ def test_map(name):
"""Test correctness of result of asynchronous maps."""
map_async = getattr(reload_parallel(), name)
assert set(map_async(FUNC, ITERABLE)) == set(map(FUNC, ITERABLE))


@mark.parametrize('name', ('map_multiprocess', 'map_multithread'))
def test_map_order(name):
"""Test result ordering of asynchronous maps."""
map_async = getattr(reload_parallel(), name)
assert tuple(map_async(FUNC, ITERABLE)) == tuple(map(FUNC, ITERABLE))

0 comments on commit ec6e31e

Please sign in to comment.