From 156ebf9bb9c2cc312d24247c9b94f72564922cae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nguy=E1=BB=85n=20Gia=20Phong?= Date: Wed, 19 Aug 2020 22:05:00 +0700 Subject: [PATCH] Display parallel download progress --- src/pip/_internal/network/download.py | 69 +++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 5 deletions(-) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index edc00b69191..286635e2b1a 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -4,10 +4,13 @@ import logging import mimetypes import os +from contextlib import contextmanager from functools import partial +from threading import Lock, Semaphore from pip._vendor.requests.models import CONTENT_CHUNK_SIZE from pip._vendor.six.moves import map +from pip._vendor.six.moves.queue import Queue from pip._internal.cli.progress_bars import DownloadProgressProvider from pip._internal.exceptions import NetworkConnectionError @@ -19,7 +22,7 @@ from pip._internal.utils.typing import MYPY_CHECK_RUNNING if MYPY_CHECK_RUNNING: - from typing import Iterable, List, Optional, Tuple + from typing import Callable, Iterable, Iterator, List, Optional, Tuple from pip._vendor.requests.models import Response @@ -220,16 +223,70 @@ def __init__( self._session = session self._progress_bar = progress_bar + self._queue = Queue() # type: Queue[bytes] + self._chunks = iter(()) # type: Iterator[bytes] + self._count = Semaphore(0) # number of downloading files + self._lock = Lock() + def _files_to_download(self, links, location): # type: (Iterable[Link], str) -> Iterable[_FileToDownload] return map(partial(_FileToDownload, location, self._session), links) - def _download_one(self, file): - # type: (_FileToDownload) -> Tuple[str, Tuple[str, str]] + @property + def _is_downloading(self): + # type: () -> bool + if self._count.acquire(blocking=False): + self._count.release() + return True + else: + return False + + def _iter_chunks(self): + # type: () -> Iterable[bytes] + while self._is_downloading: + yield self._queue.get() + + def _update_progress(self, chunk): + # type: (bytes) -> None + self._queue.put(chunk) + with self._lock: + next(self._chunks) + + @contextmanager + def _progress(self, files): + # type: (List[_FileToDownload]) -> Iterator[Callable[[bytes], None]] + """Provide a context manager of download progress. + + Its __enter__() returns a routine to update such progress, + which is to be used as the first argument of self._download_one. + """ + total_size = sum(file.size for file in files if file.size is not None) + if _should_hide_progress(total_size): + yield lambda chunk: None + else: + assert not self._is_downloading + self._count = Semaphore(len(files)) + progress = DownloadProgressProvider(self._progress_bar, total_size) + self._chunks = progress(self._iter_chunks()) + yield self._update_progress + + # Trigger a call to the progress' .finish() at the end + # of the iterator. This is not wrapped in a finally block + # because we don't reach the end if an exception is raised. + next(self._chunks, None) + + def _download_one( + self, + update, # type: Callable[[bytes], None] + file, # type: _FileToDownload + ): + # type: (...) -> Tuple[str, Tuple[str, str]] assert not file.is_cached with open(file.path, 'wb') as content_file: for chunk in file.chunks: content_file.write(chunk) + update(chunk) + self._count.acquire() return file.url, (file.path, file.type) def __call__(self, links, location): @@ -244,5 +301,7 @@ def __call__(self, links, location): for result in map_multithread(_write_from_cache, cached_files): yield result - for result in map_multithread(self._download_one, files): - yield result + with self._progress(files) as update: + download_one = partial(self._download_one, update) + for result in map_multithread(download_one, files): + yield result