Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to show single progress bar for all workers #239

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pandarallel/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Callable, Dict, Iterator, Optional, Tuple, Type, cast
from functools import partial

import dill
import pandas as pd
Expand Down Expand Up @@ -205,6 +206,7 @@ def parallelize_with_memory_file_system(
nb_requested_workers: int,
data_type: Type[DataType],
progress_bars_type: ProgressBarsType,
single_bar: bool
):
def closure(
data: Any,
Expand Down Expand Up @@ -239,7 +241,7 @@ def closure(

show_progress_bars = progress_bars_type != ProgressBarsType.No

progress_bars = get_progress_bars(progresses_length, show_progress_bars)
progress_bars = get_progress_bars(progresses_length, show_progress_bars, single_bar)
progresses = [0] * nb_workers
workers_status = [WorkerStatus.Running] * nb_workers

Expand Down Expand Up @@ -355,6 +357,7 @@ def parallelize_with_pipe(
nb_requested_workers: int,
data_type: Type[DataType],
progress_bars_type: ProgressBarsType,
single_bar: bool
):
def closure(
data: Any,
Expand Down Expand Up @@ -391,7 +394,7 @@ def closure(

show_progress_bars = progress_bars_type != ProgressBarsType.No

progress_bars = get_progress_bars(progresses_length, show_progress_bars)
progress_bars = get_progress_bars(progresses_length, show_progress_bars, single_bar)
progresses = [0] * nb_workers
workers_status = [WorkerStatus.Running] * nb_workers

Expand Down Expand Up @@ -455,6 +458,7 @@ def initialize(
shm_size_mb=None,
nb_workers=NB_PHYSICAL_CORES,
progress_bar=False,
single_bar=False,
verbose=2,
use_memory_fs: Optional[bool] = None,
) -> None:
Expand All @@ -470,6 +474,7 @@ def initialize(
if use_memory_fs
else parallelize_with_pipe
)
parallelize = partial(parallelize, single_bar=single_bar)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO instead it would be better to supply the argument in lines 528 onward as it is currently done with the arguments of parallelize. @nalepae any opinion?


if use_memory_fs and not is_memory_fs_available:
raise SystemError("Memory file system is not available")
Expand Down
23 changes: 16 additions & 7 deletions pandarallel/progress_bars.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ def is_notebook_lab() -> bool:


class ProgressBarsConsole(ProgressBars):
def __init__(self, maxs: List[int], show: bool) -> None:
def __init__(self, maxs: List[int], show: bool, single_bar=True) -> None:
self.__show = show
self.__single_bar = single_bar
if self.__single_bar:
maxs = [sum(maxs)]
self.__bars = [[0, max] for max in maxs]
self.__width = self.__get_width()

Expand Down Expand Up @@ -108,6 +111,8 @@ def update(self, values: List[int]) -> None:
if not self.__show:
return

if self.__single_bar:
values = [sum(values)]
for index, value in enumerate(values):
self.__bars[index][0] = value

Expand All @@ -119,7 +124,7 @@ def update(self, values: List[int]) -> None:


class ProgressBarsNotebookLab(ProgressBars):
def __init__(self, maxs: List[int], show: bool) -> None:
def __init__(self, maxs: List[int], show: bool, single_bar=True) -> None:
"""Initialization.
Positional argument:
maxs - List containing the max value of each progress bar
Expand All @@ -132,6 +137,9 @@ def __init__(self, maxs: List[int], show: bool) -> None:
from IPython.display import display
from ipywidgets import HBox, IntProgress, Label, VBox

self.__single_bar = single_bar
if self.__single_bar:
maxs = [sum(maxs)]
self.__bars = [
HBox(
[
Expand All @@ -151,7 +159,8 @@ def update(self, values: List[int]) -> None:
"""
if not self.__show:
return

if self.__single_bar:
values = [sum(values)]
for index, value in enumerate(values):
bar, label = self.__bars[index].children

Expand All @@ -167,18 +176,18 @@ def set_error(self, index: int) -> None:
"""Set a bar on error"""
if not self.__show:
return

if self.__single_bar: index = 0
bar, _ = self.__bars[index].children
bar.bar_style = "danger"


def get_progress_bars(
maxs: List[int], show
maxs: List[int], show, single_bar
) -> Union[ProgressBarsNotebookLab, ProgressBarsConsole]:
return (
ProgressBarsNotebookLab(maxs, show)
ProgressBarsNotebookLab(maxs, show, single_bar)
if is_notebook_lab()
else ProgressBarsConsole(maxs, show)
else ProgressBarsConsole(maxs, show, single_bar)
)


Expand Down
9 changes: 7 additions & 2 deletions tests/test_pandarallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def use_memory_fs(request):
return request.param


@pytest.fixture(params=(False, True))
def single_bar(request):
return request.param


@pytest.fixture(params=(RuntimeError, AttributeError, ZeroDivisionError))
def exception(request):
return request.param
Expand Down Expand Up @@ -158,9 +163,9 @@ def func(x):


@pytest.fixture
def pandarallel_init(progress_bar, use_memory_fs):
def pandarallel_init(progress_bar, single_bar, use_memory_fs):
pandarallel.initialize(
progress_bar=progress_bar, use_memory_fs=use_memory_fs, nb_workers=2
progress_bar=progress_bar, single_bar=single_bar, use_memory_fs=use_memory_fs, nb_workers=2
)


Expand Down