Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom progress bar that supports parallel downloads #12404

Closed
wants to merge 19 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions news/12404.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add the PipProgress class, a rich progress bar supporting parallel downloads
324 changes: 323 additions & 1 deletion src/pip/_internal/cli/progress_bars.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,348 @@
import functools
import sys
from typing import Callable, Generator, Iterable, Iterator, Optional, Tuple
from logging import Logger
from typing import (
Any,
Callable,
Generator,
Iterable,
Iterator,
List,
Optional,
Tuple,
Union,
)

from pip._vendor.rich.console import (
Console,
ConsoleOptions,
RenderableType,
RenderResult,
)
from pip._vendor.rich.progress import (
BarColumn,
DownloadColumn,
FileSizeColumn,
Progress,
ProgressColumn,
SpinnerColumn,
Task,
TaskID,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
from pip._vendor.rich.progress_bar import ProgressBar
from pip._vendor.rich.segment import Segment
from pip._vendor.rich.text import Text

from pip._internal.cli.spinners import RateLimiter
from pip._internal.utils.logging import get_indentation

DownloadProgressRenderer = Callable[[Iterable[bytes]], Iterator[bytes]]


class RenderableLine:
"""
A wrapper for a single row, renderable by `Console` methods.
"""

def __init__(self, line_items: List[Union[Text, ProgressBar]]):
self.line_items = line_items

def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
for line_item in self.line_items:
segments = [
seg
for seg in line_item.__rich_console__(console, options)
if isinstance(seg, Segment) and seg.text != "\n"
]
yield from segments


class RenderableLines:
"""
A wrapper for multiple rows, renderable by `Console` methods.
"""

def __init__(self, lines: Iterable[RenderableLine]):
self.lines = lines

def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
for idx, line in enumerate(self.lines):
if idx != 0:
yield Segment.line() # Render newline
yield from line.__rich_console__(console, options)
yield Segment.line()


class PipProgress(Progress):
"""
Custom Progress bar for sequential downloads.
"""

def __init__(
self,
refresh_per_second: int,
progress_disabled: bool = False,
logger: Optional[Logger] = None,
) -> None:
super().__init__(refresh_per_second=refresh_per_second)
self.progress_disabled = progress_disabled
self.log_download_description = True
self.logger = logger

@classmethod
def get_default_columns(cls) -> Tuple[ProgressColumn, ...]:
"""
Get the default columns to use for the progress bar when the size of
the file is known.
"""
return (
TextColumn(" " * (get_indentation() + 3)),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
)

@classmethod
def get_indefinite_columns(cls) -> Tuple[ProgressColumn, ...]:
"""
Get the columns to use for the progress bar when the size of the file
is unknown.
"""
return (
TextColumn(" " * (get_indentation() + 3)),
SpinnerColumn("line", speed=1.5),
FileSizeColumn(),
TransferSpeedColumn(),
TimeElapsedColumn(),
)

def get_renderable(self) -> RenderableType:
"""
Get the renderable representation of the progress of all tasks
"""
renderables: List[RenderableLine] = []
for task in self.tasks:
if not task.visible:
continue
task_renderable = [x for x in self.make_task_group(task) if x is not None]
renderables.extend(task_renderable)
return RenderableLines(renderables)

def make_task_group(self, task: Task) -> Iterable[Optional[RenderableLine]]:
"""
Create a representation for a task, i.e. it's progress bar.

Parameters:
- task (Task): The task for which to generate the representation.

Returns:
- Iterable[Optional[RenderableLine]]: text representation of a Progress Bar,
"""

hide_progress = task.fields["hide_progress"]
if self.progress_disabled or hide_progress:
return (None,)
columns = (
self.columns if task.total is not None else self.get_indefinite_columns()
)
progress_row = self.make_task_row(columns, task)
return (progress_row,)

def make_task_row(
self, columns: Tuple[Union[str, ProgressColumn], ...], task: Task
) -> RenderableLine:
"""
Format the columns of a single row for the given task.

"""
row_values = [
(column.format(task=task) if isinstance(column, str) else column(task))
for column in columns
]
row = self.merge_text_objects(row_values)
return RenderableLine(row)

def merge_text_objects(
self, row: List[RenderableType]
) -> List[Union[Text, ProgressBar]]:
"""
Merge adjacent Text objects in the given row into a single Text object.
"""
merged_row: List[Union[Text, ProgressBar]] = []
markup_to_merge: List[str] = []
for item in row:
if isinstance(item, ProgressBar):
if markup_to_merge:
merged_text = Text.from_markup(" ".join(markup_to_merge))
merged_row.append(merged_text)
merged_row.append(item)
markup_to_merge = [" "]
elif isinstance(item, Text):
markup_to_merge.append(item.markup)
if markup_to_merge:
merged_markup = " ".join(markup_to_merge)
merged_row.append(Text.from_markup(merged_markup))
return merged_row

def add_task(
self,
description: str,
start: bool = True,
total: Optional[float] = 100.0,
completed: int = 0,
visible: bool = True,
**fields: Any,
) -> TaskID:
"""
Reimplementation of Progress.add_task with description logging

Adds a task to the progress bar and prints the `Collecting x` or
`Using Cached x` message

The logging feature is disabled for parallel progress bars. This is
because in order to keep the logged message adjacent to its related download
progress bar, it must be added into the rendering progress bar. But doing
so causes rendering issues specifically in Jupyter notebooks.

To prevent such rendering issues while maintaining compatibility with
the default behavior (i.e., sequential downloads), two separate strategies
are employed for rendering progress bars: one for sequential downloads
and another for parallel downloads.

For more information see PR #12404
"""
if visible and self.log_download_description and self.logger:
indentation = " " * get_indentation()
log_statement = f"{indentation}{description}"
self.logger.info(log_statement)
return super().add_task(
description=description,
start=start,
total=total,
visible=visible,
completed=completed,
**fields,
)


class PipParallelProgress(PipProgress):
def __init__(self, refresh_per_second: int, progress_disabled: bool = True):
super().__init__(
refresh_per_second=refresh_per_second, progress_disabled=progress_disabled
)
# Overrides behaviour of logging description on add_task from PipProgress
self.log_download_description = False

@classmethod
def get_description_columns(cls) -> Tuple[ProgressColumn, ...]:
"""
Get the columns to use for the log message, i.e. the task description
"""
# These columns will be the "Downloading"/"Using cached" message
# This message needs to be columns because,logging this message interferes
# with parallel progress bars, and if we want the message to remain next
# to the progress bar even when there are multiple tasks, then it needs
# to be a part of the progress bar
indentation = get_indentation()
if indentation:
return (
TextColumn(" " * get_indentation()),
TextColumn("{task.description}"),
)
return (TextColumn("{task.description}"),)

def make_task_group(self, task: Task) -> Iterable[Optional[RenderableLine]]:
"""
Create a representation for a task, including both the description row
and the progress row.

Parameters:
- task (Task): The task for which to generate the representation.

Returns:
- Iterable[Optional[RenderableLine]]: An Iterable containing the
description and progress rows,
"""
progress_row = super().make_task_group(task)

description_row = self.make_task_row(self.get_description_columns(), task)
return (description_row, *progress_row)

def sort_tasks(self) -> None:
"""
Sort tasks
Remove completed tasks and print them
"""
# Removal of completed tasks reduces the number of tasks to be rendered
tasks = []
for task_id in self._tasks:
task = self._tasks[task_id]
if task.finished and len(self._tasks) > 1:
# Remove and log the finished task if there are too many active
# tasks to reduce the number of things to be rendered
# If there are too many active tasks on screen rich renders the
# overflow as a ... at the bottom of the screen which makes it
# difficult for a user to see what's happening
# If we remove every task on completion, it adds an extra newline
# for sequential downloads due to self.live on __exit__
if task.visible:
task_group = [
x for x in self.make_task_group(task) if x is not None
]
self.console.print(RenderableLines(task_group))
else:
tasks.append((task_id, self._tasks[task_id]))
# Sorting by finished ensures that all active downloads remain together
tasks = sorted(tasks, key=lambda x: not x[1].finished)
self._tasks = dict(tasks)

def update(
self,
task_id: TaskID,
*,
total: Optional[float] = None,
completed: Optional[float] = None,
advance: Optional[float] = None,
description: Optional[str] = None,
visible: Optional[bool] = None,
refresh: bool = False,
**fields: Any,
) -> None:
"""
A copy of Progress' implementation of update, with sorting of
self.tasks when a task is completed
"""
with self._lock:
task = self._tasks[task_id]
initial_finish_time = task.finished_time
super().update(
task_id,
total=total,
completed=completed,
advance=advance,
description=description,
visible=visible,
refresh=refresh,
**fields,
)
# If at the start of the update, the finish time is None and after
# calling super.update the finish time is not None, it means the
# task was just finished
task = self._tasks[task_id]
if initial_finish_time is None and task.finished_time is not None:
self.sort_tasks()


def _rich_progress_bar(
iterable: Iterable[bytes],
*,
Loading