Skip to content

Commit

Permalink
Display parallel download progress
Browse files Browse the repository at this point in the history
  • Loading branch information
McSinyx committed Aug 19, 2020
1 parent 526226c commit 83ccf72
Showing 1 changed file with 64 additions and 5 deletions.
69 changes: 64 additions & 5 deletions src/pip/_internal/network/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import logging
import mimetypes
import os
from contextlib import contextmanager
from functools import partial
from threading import Lock, Semaphore

from pip._vendor.requests.models import CONTENT_CHUNK_SIZE
from pip._vendor.six.moves import map
from pip._vendor.six.moves.queue import Queue

from pip._internal.cli.progress_bars import DownloadProgressProvider
from pip._internal.exceptions import NetworkConnectionError
Expand All @@ -27,7 +30,7 @@
from pip._internal.utils.typing import MYPY_CHECK_RUNNING

if MYPY_CHECK_RUNNING:
from typing import Iterable, List, Optional, Tuple
from typing import Callable, Iterable, Iterator, List, Optional, Tuple

from pip._vendor.requests.models import Response

Expand Down Expand Up @@ -229,16 +232,70 @@ def __init__(
self._session = session
self._progress_bar = progress_bar

self._queue = Queue() # type: Queue[bytes]
self._chunks = iter(()) # type: Iterator[bytes]
self._count = Semaphore(0) # number of downloading files
self._lock = Lock()

def _files_to_download(self, links, location):
# type: (Iterable[Link], str) -> Iterable[_FileToDownload]
return map(partial(_FileToDownload, location, self._session), links)

def _download_one(self, file):
# type: (_FileToDownload) -> Tuple[str, Tuple[str, str]]
@property
def _is_downloading(self):
# type: () -> bool
if self._count.acquire(blocking=False):
self._count.release()
return True
else:
return False

def _iter_chunks(self):
# type: () -> Iterable[bytes]
while self._is_downloading:
yield self._queue.get()

def _update_progress(self, chunk):
# type: (bytes) -> None
self._queue.put(chunk)
with self._lock:
next(self._chunks)

@contextmanager
def _progress(self, files):
# type: (List[_FileToDownload]) -> Iterator[Callable[[bytes], None]]
"""Provide a context manager of download progress.
__enter__ returns a routine to update such progress,
which is to be used as the first argument of self._download_one.
"""
total_size = sum(file.size for file in files if file.size is not None)
if _should_hide_progress(False, total_size):
yield lambda chunk: None
else:
assert not self._is_downloading
self._count = Semaphore(len(files))
progress = DownloadProgressProvider(self._progress_bar, total_size)
self._chunks = progress(self._iter_chunks())
try:
yield self._update_progress
finally:
# Trigger a call to the progress' .finish()
for _chunk in self._chunks:
pass

def _download_one(
self,
update, # type: Callable[[bytes], None]
file, # type: _FileToDownload
):
# type: (...) -> Tuple[str, Tuple[str, str]]
assert not file.is_cached
with open(file.path, 'wb') as content_file:
for chunk in file.chunks:
content_file.write(chunk)
update(chunk)
self._count.acquire()
return file.url, (file.path, file.type)

def __call__(self, links, location):
Expand All @@ -253,5 +310,7 @@ def __call__(self, links, location):
for result in map_multithread(_write_from_cache, cached_files):
yield result

for result in map_multithread(self._download_one, files):
yield result
with self._progress(files) as update:
download_one = partial(self._download_one, update)
for result in map_multithread(download_one, files):
yield result

0 comments on commit 83ccf72

Please sign in to comment.