Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data]Update Data progress bars to use row as the iteration unit #46699

Merged
merged 18 commits into from
Aug 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def __init__(
self._in_task_submission_backpressure = False
self._metrics = OpRuntimeMetrics(self)
self._estimated_num_output_bundles = None
self._estimated_output_num_rows = None
self._execution_completed = False

def __reduce__(self):
Expand Down Expand Up @@ -276,6 +277,19 @@ def num_outputs_total(self) -> Optional[int]:
"""
return self._estimated_num_output_bundles

def num_output_rows_total(self) -> Optional[int]:
"""Returns the total number of output rows of this operator,
or ``None`` if unable to provide a reasonable estimate (for example,
if no tasks have finished yet).

The value returned may be an estimate based off the consumption so far.
This is useful for reporting progress.

Subclasses should either override this method, or update
``self._estimated_output_num_rows`` appropriately.
"""
return self._estimated_output_num_rows

def start(self, options: ExecutionOptions) -> None:
"""Called by the executor when execution starts for an operator.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ def num_outputs_total(self) -> Optional[int]:
return self._cur_output_bundles
return self._estimated_num_output_bundles

def num_output_rows_total(self) -> Optional[int]:
# The total number of rows is simply the limit or the number
# of input rows, whichever is smaller
input_num_rows = self.input_dependencies[0].num_output_rows_total()
if input_num_rows is None:
return None
return min(self._limit, input_num_rows)

def throttling_disabled(self) -> bool:
return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ def _output_ready_callback(task_index, output: RefBundle):
def _task_done_callback(task_index: int, exception: Optional[Exception]):
self._metrics.on_task_finished(task_index, exception)

# Estimate number of tasks from inputs received and tasks submitted so far
# Estimate number of tasks and rows from inputs received and tasks
# submitted so far
upstream_op_num_outputs = self.input_dependencies[0].num_outputs_total()
if upstream_op_num_outputs:
estimated_num_tasks = (
Expand All @@ -329,6 +330,11 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]):
* self._metrics.num_outputs_of_finished_tasks
/ self._metrics.num_tasks_finished
)
self._estimated_num_output_rows = round(
estimated_num_tasks
* self._metrics.rows_task_outputs_generated
/ self._metrics.num_tasks_finished
)

self._data_tasks.pop(task_index)
# Notify output queue that this task is complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def num_outputs_total(self) -> Optional[int]:
# so we can return the number of blocks from the input op.
return self.input_dependencies[0].num_outputs_total()

def num_output_rows_total(self) -> Optional[int]:
# The total number of rows is the same as the number of input rows.
return self.input_dependencies[0].num_output_rows_total()

def start(self, options: ExecutionOptions) -> None:
super().start(options)
# Force disable locality optimization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ def num_outputs_total(self) -> Optional[int]:
num_outputs += input_num_outputs
return num_outputs

def num_output_rows_total(self) -> Optional[int]:
total_rows = 0
for input_op in self.input_dependencies:
input_num_rows = input_op.num_output_rows_total()
if input_num_rows is None:
return None
Bye-legumes marked this conversation as resolved.
Show resolved Hide resolved
total_rows += input_num_rows
return total_rows

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index
Expand Down
10 changes: 10 additions & 0 deletions python/ray/data/_internal/execution/operators/zip_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ def num_outputs_total(self) -> Optional[int]:
else:
return right_num_outputs

def num_output_rows_total(self) -> Optional[int]:
left_num_rows = self.input_dependencies[0].num_output_rows_total()
right_num_rows = self.input_dependencies[1].num_output_rows_total()
if left_num_rows is not None and right_num_rows is not None:
return max(left_num_rows, right_num_rows)
elif left_num_rows is not None:
return left_num_rows
else:
return right_num_rows

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert input_index == 0 or input_index == 1, input_index
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ def execute(

if not isinstance(dag, InputDataBuffer):
# Note: DAG must be initialized in order to query num_outputs_total.
# TODO(zhilong): Implement num_output_rows_total for all
# AllToAllOperators
self._global_info = ProgressBar(
"Running", dag.num_outputs_total(), unit="bundle"
"Running", dag.num_output_rows_total(), unit="row"
)

self._output_node: OpState = self._topology[dag]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int:
)
self.progress_bar = ProgressBar(
"- " + self.op.name,
self.op.num_outputs_total(),
unit="bundle",
self.op.num_output_rows_total(),
unit="row",
position=index,
enabled=progress_bar_enabled,
)
Expand Down Expand Up @@ -242,7 +242,7 @@ def add_output(self, ref: RefBundle) -> None:
self.outqueue.append(ref)
self.num_completed_tasks += 1
if self.progress_bar:
self.progress_bar.update(1, self.op.num_outputs_total())
self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total())

def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
"""Update the console with the latest operator progress."""
Expand Down