Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data]Update Data progress bars to use row as the iteration unit #46699

Merged
merged 18 commits into from
Aug 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def __init__(
self._in_task_submission_backpressure = False
self._metrics = OpRuntimeMetrics(self)
self._estimated_num_output_bundles = None
self._estimated_output_num_rows = None
self._execution_completed = False

def __reduce__(self):
Expand Down Expand Up @@ -276,6 +277,19 @@ def num_outputs_total(self) -> Optional[int]:
"""
return self._estimated_num_output_bundles

def num_output_rows_total(self) -> Optional[int]:
"""Returns the total number of output rows of this operator,
or ``None`` if unable to provide a reasonable estimate (for example,
if no tasks have finished yet).

The value returned may be an estimate based off the consumption so far.
This is useful for reporting progress.

Subclasses should either override this method, or update
``self._estimated_output_num_rows`` appropriately.
"""
return self._estimated_output_num_rows

def start(self, options: ExecutionOptions) -> None:
"""Called by the executor when execution starts for an operator.

Expand Down Expand Up @@ -459,8 +473,3 @@ def implements_accurate_memory_accounting(self) -> bool:
def supports_fusion(self) -> bool:
"""Returns ```True``` if this operator can be fused with other operators."""
return False

@property
def estimated_output_num_rows(self) -> Optional[int]:
"""Return the estimated number of output rows for this operator."""
return None
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ def num_outputs_total(self) -> Optional[int]:
return self._cur_output_bundles
return self._estimated_num_output_bundles

def num_output_rows_total(self) -> Optional[int]:
# The total number of rows is simply the limit or the number
# of input rows, whichever is smaller
input_num_rows = self.input_dependencies[0].num_output_rows_total()
if input_num_rows is None:
return None
return min(self._limit, input_num_rows)

def throttling_disabled(self) -> bool:
return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,6 @@ def num_active_tasks(self) -> int:
# to reflect the actual data processing tasks.
return len(self._data_tasks)

@property
def estimated_output_num_rows(self) -> Optional[int]:
return getattr(self, "_estimated_output_num_rows", 0)


def _map_task(
map_transformer: MapTransformer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def num_outputs_total(self) -> Optional[int]:
# so we can return the number of blocks from the input op.
return self.input_dependencies[0].num_outputs_total()

def num_output_rows_total(self) -> Optional[int]:
# The total number of rows is the same as the number of input rows.
return self.input_dependencies[0].num_output_rows_total()

def start(self, options: ExecutionOptions) -> None:
super().start(options)
# Force disable locality optimization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ def num_outputs_total(self) -> Optional[int]:
num_outputs += input_num_outputs
return num_outputs

def num_output_rows_total(self) -> Optional[int]:
total_rows = 0
for input_op in self.input_dependencies:
input_num_rows = input_op.num_output_rows_total()
if input_num_rows is None:
return None
Bye-legumes marked this conversation as resolved.
Show resolved Hide resolved
total_rows += input_num_rows
return total_rows

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index
Expand Down
10 changes: 10 additions & 0 deletions python/ray/data/_internal/execution/operators/zip_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ def num_outputs_total(self) -> Optional[int]:
else:
return right_num_outputs

def num_output_rows_total(self) -> Optional[int]:
left_num_rows = self.input_dependencies[0].num_output_rows_total()
right_num_rows = self.input_dependencies[1].num_output_rows_total()
if left_num_rows is not None and right_num_rows is not None:
return max(left_num_rows, right_num_rows)
elif left_num_rows is not None:
return left_num_rows
else:
return right_num_rows

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert input_index == 0 or input_index == 1, input_index
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ def execute(

if not isinstance(dag, InputDataBuffer):
# Note: DAG must be initialized in order to query num_outputs_total.
# TODO(zhilong): Implement num_output_rows_total for all
# AllToAllOperators
self._global_info = ProgressBar(
"Running", dag.estimated_output_num_rows, unit="row"
"Running", dag.num_output_rows_total(), unit="row"
)

self._output_node: OpState = self._topology[dag]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int:
)
self.progress_bar = ProgressBar(
"- " + self.op.name,
self.op.estimated_output_num_rows,
self.op.num_output_rows_total(),
unit="row",
position=index,
enabled=progress_bar_enabled,
Expand Down Expand Up @@ -242,8 +242,7 @@ def add_output(self, ref: RefBundle) -> None:
self.outqueue.append(ref)
self.num_completed_tasks += 1
if self.progress_bar:
num_rows = sum(meta.num_rows for _, meta in ref.blocks)
self.progress_bar.update(num_rows, self.op.estimated_output_num_rows)
self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total())

def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
"""Update the console with the latest operator progress."""
Expand Down
18 changes: 14 additions & 4 deletions python/ray/data/_internal/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import ray
from ray.experimental import tqdm_ray
from ray.experimental.tqdm_ray import format_num
from ray.types import ObjectRef
from ray.util.annotations import Deprecated

Expand Down Expand Up @@ -119,16 +120,25 @@ def fetch_until_complete(self, refs: List[ObjectRef]) -> List[Any]:
def set_description(self, name: str) -> None:
if self._bar and name != self._desc:
self._desc = name
self._bar.set_description(self._desc)
formatted_progress = format_num(self._progress)
formatted_total = (
format_num(self._bar.total) if self._bar.total is not None else "??"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here and elsewhere, i don't think we need the special case for handling if self._bar_total is None. according to tqdm docs, this is already handled by not showing the denominator (only basic stats showing numerator and rows/s):

The expected total number of iterations. If meaningless (None), only basic progress statistics are displayed (no ETA).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I update the progress bar manually, I need to make sure the number or None pass to format_num handled correctly. So I add ?? here. As when None is passed to format_num, it will raise error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we change to:

formatted_total = (
                format_num(self._bar.total) if self._bar.total is not None else self._bar.total

so we only apply format_num if self._bar_total is not None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of updating the bar description itself and calling format_num() for the updated description, maybe we can pass bar_format parameter to tqdm.tqdm(). specifically in this r_bar section:

r_bar='| {format_num(n_fmt)}/{format_num(total_fmt)} [{elapsed}<{remaining}, ' '{rate_fmt}{postfix}]'

something like this. then, i think we wouldn't need to manually update the entire description here and in _get_state()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried, this is current what bar looks like
image
but if we use bar_format, it will lose some infomation here, so I think the simplest solution is just change the number.
image

)
self._bar.set_description(
f"{self._desc} {formatted_progress}/{formatted_total}"
)

def update(self, i: int = 0, total: Optional[int] = None) -> None:
if self._bar and (i != 0 or self._bar.total != total):
self._progress += i
if total is not None:
self._bar.total = total
if self._bar.total is not None and self._progress > self._bar.total:
# If the progress goes over 100%, update the total.
self._bar.total = self._progress
formatted_total = format_num(self._bar.total)
formatted_progress = format_num(self._progress)
self._bar.set_description(
f"{self._desc} {formatted_progress} \
/{formatted_total if total is not None else '??'}"
)
self._bar.update(i)

def close(self):
Expand Down
14 changes: 12 additions & 2 deletions python/ray/experimental/tqdm_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ def safe_print(*args, **kwargs):
instance().unhide_bars()


def format_num(n):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of duplicating tqdm source code logic here, could we directly use tqdm.tqdm.format_num() in other places instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have consider this but finnally I just copy the codes from format_num. The reason I just define it here is 1. tqdm is not import in tqdm_ray currently.. and in fact we just only want to use format_num function. 2 .In addition, we self defined a class tqdm, so I just doesn't want to import tqdm here..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah got it, that makes sense. i see the real_tqdm import is importing tqdm.auto, which does not have format_num. let's keep the method you copied here, thanks

"""Intelligent scientific notation (.3g)."""
f = f"{n:.3g}".replace("e+0", "e+").replace("e-0", "e-")
n = str(n)
return f if len(f) < len(n) else n


class tqdm:
"""Experimental: Ray distributed tqdm implementation.

Expand Down Expand Up @@ -99,7 +106,8 @@ def __init__(

def set_description(self, desc):
"""Implements tqdm.tqdm.set_description."""
self._desc = desc
self._desc = f"{desc} ({format_num(self._x)}\
/{format_num(self._total) if self._total else '??'})"
self._dump_state()

def update(self, n=1):
Expand Down Expand Up @@ -139,11 +147,13 @@ def _dump_state(self, force_flush=False) -> None:
instance().process_state_update(copy.deepcopy(self._get_state()))

def _get_state(self) -> ProgressBarState:
"""Get the formatted state of the progress bar."""
return {
"__magic_token__": RAY_TQDM_MAGIC,
"x": self._x,
"pos": self._pos,
"desc": self._desc,
"desc": f"{self._desc} {format_num(self._x)}\
/{format_num(self._total) if self._total else '??'}",
"total": self._total,
"unit": self._unit,
"ip": self._ip,
Expand Down