diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index 47052aec0cde..2753ca63be7f 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -185,6 +185,7 @@ def __init__( self._in_task_submission_backpressure = False self._metrics = OpRuntimeMetrics(self) self._estimated_num_output_bundles = None + self._estimated_output_num_rows = None self._execution_completed = False def __reduce__(self): @@ -276,6 +277,19 @@ def num_outputs_total(self) -> Optional[int]: """ return self._estimated_num_output_bundles + def num_output_rows_total(self) -> Optional[int]: + """Returns the total number of output rows of this operator, + or ``None`` if unable to provide a reasonable estimate (for example, + if no tasks have finished yet). + + The value returned may be an estimate based off the consumption so far. + This is useful for reporting progress. + + Subclasses should either override this method, or update + ``self._estimated_output_num_rows`` appropriately. + """ + return self._estimated_output_num_rows + def start(self, options: ExecutionOptions) -> None: """Called by the executor when execution starts for an operator. diff --git a/python/ray/data/_internal/execution/operators/limit_operator.py b/python/ray/data/_internal/execution/operators/limit_operator.py index ef0ca8c5b26d..a557762ec69a 100644 --- a/python/ray/data/_internal/execution/operators/limit_operator.py +++ b/python/ray/data/_internal/execution/operators/limit_operator.py @@ -116,6 +116,14 @@ def num_outputs_total(self) -> Optional[int]: return self._cur_output_bundles return self._estimated_num_output_bundles + def num_output_rows_total(self) -> Optional[int]: + # The total number of rows is simply the limit or the number + # of input rows, whichever is smaller + input_num_rows = self.input_dependencies[0].num_output_rows_total() + if input_num_rows is None: + return None + return min(self._limit, input_num_rows) + def throttling_disabled(self) -> bool: return True diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 8ed617e34490..c665d035a1c5 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -316,7 +316,8 @@ def _output_ready_callback(task_index, output: RefBundle): def _task_done_callback(task_index: int, exception: Optional[Exception]): self._metrics.on_task_finished(task_index, exception) - # Estimate number of tasks from inputs received and tasks submitted so far + # Estimate number of tasks and rows from inputs received and tasks + # submitted so far upstream_op_num_outputs = self.input_dependencies[0].num_outputs_total() if upstream_op_num_outputs: estimated_num_tasks = ( @@ -329,6 +330,11 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]): * self._metrics.num_outputs_of_finished_tasks / self._metrics.num_tasks_finished ) + self._estimated_num_output_rows = round( + estimated_num_tasks + * self._metrics.rows_task_outputs_generated + / self._metrics.num_tasks_finished + ) self._data_tasks.pop(task_index) # Notify output queue that this task is complete. diff --git a/python/ray/data/_internal/execution/operators/output_splitter.py b/python/ray/data/_internal/execution/operators/output_splitter.py index 38517a9e8633..f5a9b6c55d84 100644 --- a/python/ray/data/_internal/execution/operators/output_splitter.py +++ b/python/ray/data/_internal/execution/operators/output_splitter.py @@ -74,6 +74,10 @@ def num_outputs_total(self) -> Optional[int]: # so we can return the number of blocks from the input op. return self.input_dependencies[0].num_outputs_total() + def num_output_rows_total(self) -> Optional[int]: + # The total number of rows is the same as the number of input rows. + return self.input_dependencies[0].num_output_rows_total() + def start(self, options: ExecutionOptions) -> None: super().start(options) # Force disable locality optimization. diff --git a/python/ray/data/_internal/execution/operators/union_operator.py b/python/ray/data/_internal/execution/operators/union_operator.py index 4a2a73d7817a..815295070c16 100644 --- a/python/ray/data/_internal/execution/operators/union_operator.py +++ b/python/ray/data/_internal/execution/operators/union_operator.py @@ -55,6 +55,15 @@ def num_outputs_total(self) -> Optional[int]: num_outputs += input_num_outputs return num_outputs + def num_output_rows_total(self) -> Optional[int]: + total_rows = 0 + for input_op in self.input_dependencies: + input_num_rows = input_op.num_output_rows_total() + if input_num_rows is None: + return None + total_rows += input_num_rows + return total_rows + def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: assert not self.completed() assert 0 <= input_index <= len(self._input_dependencies), input_index diff --git a/python/ray/data/_internal/execution/operators/zip_operator.py b/python/ray/data/_internal/execution/operators/zip_operator.py index da54874e6281..ed1abf31a5fa 100644 --- a/python/ray/data/_internal/execution/operators/zip_operator.py +++ b/python/ray/data/_internal/execution/operators/zip_operator.py @@ -53,6 +53,16 @@ def num_outputs_total(self) -> Optional[int]: else: return right_num_outputs + def num_output_rows_total(self) -> Optional[int]: + left_num_rows = self.input_dependencies[0].num_output_rows_total() + right_num_rows = self.input_dependencies[1].num_output_rows_total() + if left_num_rows is not None and right_num_rows is not None: + return max(left_num_rows, right_num_rows) + elif left_num_rows is not None: + return left_num_rows + else: + return right_num_rows + def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: assert not self.completed() assert input_index == 0 or input_index == 1, input_index diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index c7d8521ac1e5..4e9119b0dc4c 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -128,8 +128,10 @@ def execute( if not isinstance(dag, InputDataBuffer): # Note: DAG must be initialized in order to query num_outputs_total. + # TODO(zhilong): Implement num_output_rows_total for all + # AllToAllOperators self._global_info = ProgressBar( - "Running", dag.num_outputs_total(), unit="bundle" + "Running", dag.num_output_rows_total(), unit="row" ) self._output_node: OpState = self._topology[dag] diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index cb655ae16c22..fff7c48d3bd1 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -213,8 +213,8 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: ) self.progress_bar = ProgressBar( "- " + self.op.name, - self.op.num_outputs_total(), - unit="bundle", + self.op.num_output_rows_total(), + unit="row", position=index, enabled=progress_bar_enabled, ) @@ -245,7 +245,10 @@ def add_output(self, ref: RefBundle) -> None: self.outqueue.append(ref) self.num_completed_tasks += 1 if self.progress_bar: - self.progress_bar.update(1, self.op.num_outputs_total()) + assert ( + ref.num_rows() is not None + ), "RefBundle must have a valid number of rows" + self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total()) def refresh_progress_bar(self, resource_manager: ResourceManager) -> None: """Update the console with the latest operator progress."""