Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data]Update Data progress bars to use row as the iteration unit #46699

Merged
merged 18 commits into from
Aug 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,8 @@ def implements_accurate_memory_accounting(self) -> bool:
def supports_fusion(self) -> bool:
"""Returns ```True``` if this operator can be fused with other operators."""
return False

@property
def estimated_output_num_rows(self) -> Optional[int]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename property to keep naming convention consistent with num_outputs_total

Suggested change
def estimated_output_num_rows(self) -> Optional[int]:
def num_output_rows_total(self) -> Optional[int]:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, can we add _estimated_output_num_rows as a class attribute in PhysicalOperator.__init__() (set to None)? so that here, we can simply return self._estimated_output_num_rows. and in MapOperator, we won't need to define its own estimated_output_num_rows

"""Return the estimated number of output rows for this operator."""
return None
12 changes: 11 additions & 1 deletion python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ def _output_ready_callback(task_index, output: RefBundle):
def _task_done_callback(task_index: int, exception: Optional[Exception]):
self._metrics.on_task_finished(task_index, exception)

# Estimate number of tasks from inputs received and tasks submitted so far
# Estimate number of tasks and rows from inputs received and tasks
# submitted so far
upstream_op_num_outputs = self.input_dependencies[0].num_outputs_total()
if upstream_op_num_outputs:
estimated_num_tasks = (
Expand All @@ -329,6 +330,11 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]):
* self._metrics.num_outputs_of_finished_tasks
/ self._metrics.num_tasks_finished
)
self._estimated_num_output_rows = round(
estimated_num_tasks
* self._metrics.rows_task_outputs_generated
/ self._metrics.num_tasks_finished
)

self._data_tasks.pop(task_index)
# Notify output queue that this task is complete.
Expand Down Expand Up @@ -427,6 +433,10 @@ def num_active_tasks(self) -> int:
# to reflect the actual data processing tasks.
return len(self._data_tasks)

@property
def estimated_output_num_rows(self) -> Optional[int]:
return getattr(self, "_estimated_output_num_rows", 0)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't need this once we implement class attribute at the PhysicalOperator level.


def _map_task(
map_transformer: MapTransformer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def execute(
if not isinstance(dag, InputDataBuffer):
# Note: DAG must be initialized in order to query num_outputs_total.
self._global_info = ProgressBar(
"Running", dag.num_outputs_total(), unit="bundle"
"Running", dag.estimated_output_num_rows, unit="row"
)

self._output_node: OpState = self._topology[dag]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int:
)
self.progress_bar = ProgressBar(
"- " + self.op.name,
self.op.num_outputs_total(),
unit="bundle",
self.op.estimated_output_num_rows,
unit="row",
position=index,
enabled=progress_bar_enabled,
)
Expand Down Expand Up @@ -242,7 +242,8 @@ def add_output(self, ref: RefBundle) -> None:
self.outqueue.append(ref)
self.num_completed_tasks += 1
if self.progress_bar:
self.progress_bar.update(1, self.op.num_outputs_total())
num_rows = sum(meta.num_rows for _, meta in ref.blocks)
self.progress_bar.update(num_rows, self.op.estimated_output_num_rows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
num_rows = sum(meta.num_rows for _, meta in ref.blocks)
self.progress_bar.update(num_rows, self.op.estimated_output_num_rows)
self.progress_bar.update(ref.num_rows(), self.op.estimated_output_num_rows)


def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
"""Update the console with the latest operator progress."""
Expand Down