Skip to content

Commit

Permalink
Wrap lazy map as well
Browse files Browse the repository at this point in the history
  • Loading branch information
McSinyx committed Jun 21, 2020
1 parent f9663b4 commit e1bac23
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 46 deletions.
216 changes: 183 additions & 33 deletions src/pip/_internal/utils/parallel.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,215 @@
"""Convenient parallelization of higher order functions."""
"""Convenient parallelization of higher order functions.
__all__ = ['map_multiprocess', 'map_multithread']
This module provides proper fallback functions for multiprocess
and multithread map, both the non-lazy, ordered variant
and the lazy, unordered variant.
"""

__all__ = ['map_multiprocess', 'imap_multiprocess',
'map_multithread', 'imap_multithread']

from contextlib import contextmanager
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool

from pip._vendor.requests.adapters import DEFAULT_POOLSIZE
from pip._vendor.six import PY2
from pip._vendor.six.moves import map

from pip._internal.utils.typing import MYPY_CHECK_RUNNING

if MYPY_CHECK_RUNNING:
from typing import Callable, Iterable, List, Optional, TypeVar
from typing import (
Callable, Iterable, Iterator, List, Optional, Union, TypeVar)
from multiprocessing import pool

Pool = Union[pool.Pool, pool.ThreadPool]
S = TypeVar('S')
T = TypeVar('T')

# On platforms without sem_open, multiprocessing[.dummy] Pool
# cannot be created.
try:
import multiprocessing.synchronize # noqa
except ImportError:
LACK_SEM_OPEN = True
else:
LACK_SEM_OPEN = False

# Incredibly large timeout to work around bpo-8296 on Python 2.
TIMEOUT = 2000000


@contextmanager
def closing(pool):
# type: (Pool) -> Iterator[Pool]
"""Return a context manager that closes and joins pool.
This is needed for Pool.imap* to make the result iterator iterate.
"""
try:
yield pool
finally:
pool.close()
pool.join()


def _map_fallback(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Return a list of func applied to each element in iterable.
This function is the sequential fallback when sem_open is unavailable.
"""
return list(map(func, iterable))


def _imap_fallback(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Make an iterator applying func to each element in iterable.
This function is the sequential fallback when sem_open is unavailable.
"""
return map(func, iterable)


def _map_multiprocess_py2(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a process pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Note that this function may cause high memory usage
for long iterables.
Return a list of results in order.
"""
pool = ProcessPool()
try:
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
finally:
pool.terminate()


def _map_multiprocess_py3(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a process pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Note that this function may cause high memory usage
for long iterables.
Return a list of results in order.
"""
with ProcessPool() as pool:
return pool.map(func, iterable, chunksize)


def _imap_multiprocess_py2(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
pool = ProcessPool()
try:
return iter(pool.map_async(func, iterable, chunksize).get(TIMEOUT))
finally:
pool.terminate()

def map_multiprocess(func, iterable, chunksize=None, timeout=2000000):
# type: (Callable[[S], T], Iterable[S], Optional[int], int) -> List[T]

def _imap_multiprocess_py3(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
with ProcessPool() as pool, closing(pool):
return pool.imap_unordered(func, iterable, chunksize)


def _map_multithread_py2(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a thread pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Block either until the results are ready and return them in a list
or till timeout is reached. By default timeout is an incredibly
large number to work around bpo-8296 on Python 2.
Note that this function may cause high memory usage
for long iterables.
Note that it may cause high memory usage for long iterables.
Return a list of results in order.
"""
pool = ThreadPool(DEFAULT_POOLSIZE)
try:
pool = ProcessPool()
except ImportError:
return list(map(func, iterable))
else:
try:
return pool.map_async(func, iterable, chunksize).get(timeout)
finally:
pool.terminate()


def map_multithread(func, iterable, chunksize=None, timeout=2000000):
# type: (Callable[[S], T], Iterable[S], Optional[int], int) -> List[T]
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
finally:
pool.terminate()


def _map_multithread_py3(func, iterable, chunksize=None):
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
"""Chop iterable into chunks and submit them to a thread pool.
The (approximate) size of these chunks can be specified
by setting chunksize to a positive integer.
Block either until the results are ready and return them in a list
or till timeout is reached. By default timeout is an incredibly
large number to work around bpo-8296 on Python 2.
Note that this function may cause high memory usage
for long iterables.
Note that it may cause high memory usage for long iterables.
Return a list of results in order.
"""
with ThreadPool(DEFAULT_POOLSIZE) as pool:
return pool.map(func, iterable, chunksize)


def _imap_multithread_py2(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
pool = ThreadPool(DEFAULT_POOLSIZE)
try:
pool = ThreadPool(DEFAULT_POOLSIZE)
except ImportError:
return list(map(func, iterable))
else:
try:
return pool.map_async(func, iterable, chunksize).get(timeout)
finally:
pool.terminate()
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
finally:
pool.terminate()


def _imap_multithread_py3(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.
For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.
Return an unordered iterator of the results.
"""
with ThreadPool(DEFAULT_POOLSIZE) as pool, closing(pool):
return pool.imap_unordered(func, iterable, chunksize)


if LACK_SEM_OPEN:
map_multiprocess = map_multithread = _map_fallback
imap_multiprocess = imap_multithread = _imap_fallback
elif PY2:
map_multiprocess = _map_multiprocess_py2
imap_multiprocess = _imap_multiprocess_py2
map_multithread = _map_multithread_py2
imap_multithread = _imap_multithread_py2
else:
map_multiprocess = _map_multiprocess_py3
imap_multiprocess = _imap_multiprocess_py3
map_multithread = _map_multithread_py3
imap_multithread = _imap_multithread_py3
62 changes: 49 additions & 13 deletions tests/unit/test_utils_parallel.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,72 @@
"""Test multiprocessing/multithreading higher-order functions."""

from importlib import import_module
from math import factorial
from sys import modules

from mock import patch
from pip._vendor.six import PY2
from pip._vendor.six.moves import map
from pytest import mark

from pip._internal.utils.parallel import map_multiprocess, map_multithread

DUNDER_IMPORT = '__builtin__.__import__' if PY2 else 'builtins.__import__'
FUNC, ITERABLE = factorial, range(42)
MAPS = ('map_multiprocess', 'imap_multiprocess',
'map_multithread', 'imap_multithread')
_import = __import__


def reload_parallel():
try:
del modules['pip._internal.utils.parallel']
finally:
return import_module('pip._internal.utils.parallel')


def import_sem_open(name, *args, **kwargs):
def lack_sem_open(name, *args, **kwargs):
"""Raise ImportError on import of multiprocessing.synchronize."""
if name.endswith('.synchronize'):
if name.endswith('synchronize'):
raise ImportError
return _import(name, *args, **kwargs)


def have_sem_open(name, *args, **kwargs):
"""Make sure multiprocessing.synchronize import is successful."""
if name.endswith('synchronize'):
return
return _import(name, *args, **kwargs)


@mark.parametrize('map_async', (map_multiprocess, map_multithread))
def test_missing_sem_open(map_async, monkeypatch):
@mark.parametrize('name', MAPS)
def test_lack_sem_open(name, monkeypatch):
"""Test fallback when sem_open is not available.
If so, multiprocessing[.dummy].Pool will fail to be created and
map_async should fallback to map and still return correct result.
map_async should fallback to map.
"""
with patch(DUNDER_IMPORT, side_effect=import_sem_open):
assert map_async(FUNC, ITERABLE) == list(map(FUNC, ITERABLE))
monkeypatch.setattr(DUNDER_IMPORT, lack_sem_open)
parallel = reload_parallel()
fallback = '_{}_fallback'.format(name.split('_')[0])
assert getattr(parallel, name) is getattr(parallel, fallback)


@mark.parametrize('name', MAPS)
def test_have_sem_open(name, monkeypatch):
"""Test fallback when sem_open is available."""
monkeypatch.setattr(DUNDER_IMPORT, have_sem_open)
parallel = reload_parallel()
impl = ('_{}_py2' if PY2 else '_{}_py3').format(name)
assert getattr(parallel, name) is getattr(parallel, impl)


@mark.parametrize('name', MAPS)
def test_map(name):
"""Test correctness of result of asynchronous maps."""
map_async = getattr(reload_parallel(), name)
assert set(map_async(FUNC, ITERABLE)) == set(map(FUNC, ITERABLE))


@mark.parametrize('map_async', (map_multiprocess, map_multithread))
def test_map_order(map_async):
@mark.parametrize('name', ('map_multiprocess', 'map_multithread'))
def test_map_order(name):
"""Test result ordering of asynchronous maps."""
assert map_async(FUNC, ITERABLE) == list(map(FUNC, ITERABLE))
map_async = getattr(reload_parallel(), name)
assert tuple(map_async(FUNC, ITERABLE)) == tuple(map(FUNC, ITERABLE))

0 comments on commit e1bac23

Please sign in to comment.