From 4cb78b0e33a062fa364fa9e67fcda03d3f341827 Mon Sep 17 00:00:00 2001 From: zhilong Date: Thu, 18 Jul 2024 17:01:25 -0400 Subject: [PATCH 01/13] fix Signed-off-by: zhilong --- .../execution/operators/map_operator.py | 17 ++++++++++------- .../_internal/execution/streaming_executor.py | 14 +++----------- .../execution/streaming_executor_state.py | 14 ++++---------- 3 files changed, 17 insertions(+), 28 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 8ed617e34490..635aac1c66d4 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -316,7 +316,7 @@ def _output_ready_callback(task_index, output: RefBundle): def _task_done_callback(task_index: int, exception: Optional[Exception]): self._metrics.on_task_finished(task_index, exception) - # Estimate number of tasks from inputs received and tasks submitted so far + # Estimate number of tasks and rows from inputs received and tasks submitted so far upstream_op_num_outputs = self.input_dependencies[0].num_outputs_total() if upstream_op_num_outputs: estimated_num_tasks = ( @@ -329,6 +329,11 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]): * self._metrics.num_outputs_of_finished_tasks / self._metrics.num_tasks_finished ) + # Estimate the number of output rows based on completed tasks + total_output_rows = sum(bundle.num_rows() for bundle in self._output_metadata) + self._estimated_output_num_rows = round( + total_output_rows / self._metrics.num_tasks_finished * estimated_num_tasks + ) self._data_tasks.pop(task_index) # Notify output queue that this task is complete. @@ -336,12 +341,6 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]): if task_done_callback: task_done_callback() - self._data_tasks[task_index] = DataOpTask( - task_index, - gen, - lambda output: _output_ready_callback(task_index, output), - functools.partial(_task_done_callback, task_index), - ) def _submit_metadata_task( self, result_ref: ObjectRef, task_done_callback: Callable[[], None] @@ -426,6 +425,10 @@ def num_active_tasks(self) -> int: # 2. The number of active tasks in the progress bar will be more accurate # to reflect the actual data processing tasks. return len(self._data_tasks) + + @property + def estimated_output_num_rows(self) -> Optional[int]: + return getattr(self, '_estimated_output_num_rows', None) def _map_task( diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index fb566d9bd15b..1acdfcfac298 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -99,15 +99,7 @@ def execute( self._start_time = time.perf_counter() if not isinstance(dag, InputDataBuffer): - context = DataContext.get_current() - if context.print_on_execution_start: - message = "Starting execution of Dataset." - log_path = get_log_directory() - if log_path is not None: - message += f" Full logs are in {log_path}" - logger.info(message) - logger.info(f"Execution plan of Dataset: {dag}") - + # existing code logger.debug("Execution config: %s", self._options) # Setup the streaming DAG topology and start the runner thread. @@ -123,9 +115,9 @@ def execute( self._has_op_completed = {op: False for op in self._topology} if not isinstance(dag, InputDataBuffer): - # Note: DAG must be initialized in order to query num_outputs_total. + total_estimated_rows = dag.estimated_output_num_rows or dag.num_outputs_total() self._global_info = ProgressBar( - "Running", dag.num_outputs_total(), unit="bundle" + "Running", total_estimated_rows, unit="row" ) self._output_node: OpState = self._topology[dag] diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 28376c149694..3bda38fa63a5 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -198,30 +198,24 @@ def __repr__(self): return f"OpState({self.op.name})" def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: - """Create progress bars at the given index (line offset in console). - - For AllToAllOperator, zero or more sub progress bar would be created. - Return the number of enabled progress bars created for this operator. - """ is_all_to_all = isinstance(self.op, AllToAllOperator) - # Only show 1:1 ops when in verbose progress mode. progress_bar_enabled = DataContext.get_current().enable_progress_bars and ( is_all_to_all or verbose_progress ) + total_estimated_rows = self.op.estimated_output_num_rows or self.op.num_outputs_total() self.progress_bar = ProgressBar( "- " + self.op.name, - self.op.num_outputs_total(), - unit="bundle", + total_estimated_rows, + unit="row", position=index, enabled=progress_bar_enabled, ) num_progress_bars = 1 if is_all_to_all: - # Initialize must be called for sub progress bars, even the - # bars are not enabled via the DataContext. num_progress_bars += self.op.initialize_sub_progress_bars(index + 1) return num_progress_bars if progress_bar_enabled else 0 + def close_progress_bars(self): """Close all progress bars for this operator.""" if self.progress_bar: From 71617543b23dd95ad8e75bdae3c1b4b3460efd5f Mon Sep 17 00:00:00 2001 From: zhilong Date: Fri, 19 Jul 2024 11:18:43 -0400 Subject: [PATCH 02/13] fix Signed-off-by: zhilong --- .../_internal/execution/operators/map_operator.py | 7 +++++++ .../data/_internal/execution/streaming_executor.py | 10 +++++++++- .../_internal/execution/streaming_executor_state.py | 11 +++++++++-- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 635aac1c66d4..73dfff70b1be 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -340,6 +340,13 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]): self._output_queue.notify_task_completed(task_index) if task_done_callback: task_done_callback() + + self._data_tasks[task_index] = DataOpTask( + task_index, + gen, + lambda output: _output_ready_callback(task_index, output), + functools.partial(_task_done_callback, task_index), + ) def _submit_metadata_task( diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 1acdfcfac298..e29fc53a39d3 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -99,7 +99,14 @@ def execute( self._start_time = time.perf_counter() if not isinstance(dag, InputDataBuffer): - # existing code + context = DataContext.get_current() + if context.print_on_execution_start: + message = "Starting execution of Dataset." + log_path = get_log_directory() + if log_path is not None: + message += f" Full logs are in {log_path}" + logger.info(message) + logger.info(f"Execution plan of Dataset: {dag}") logger.debug("Execution config: %s", self._options) # Setup the streaming DAG topology and start the runner thread. @@ -115,6 +122,7 @@ def execute( self._has_op_completed = {op: False for op in self._topology} if not isinstance(dag, InputDataBuffer): + # Note: DAG must be initialized in order to query num_outputs_total. total_estimated_rows = dag.estimated_output_num_rows or dag.num_outputs_total() self._global_info = ProgressBar( "Running", total_estimated_rows, unit="row" diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 3bda38fa63a5..65dbd9868caa 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -198,20 +198,27 @@ def __repr__(self): return f"OpState({self.op.name})" def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: + """Create progress bars at the given index (line offset in console). + + For AllToAllOperator, zero or more sub progress bar would be created. + Return the number of enabled progress bars created for this operator. + """ is_all_to_all = isinstance(self.op, AllToAllOperator) + # Only show 1:1 ops when in verbose progress mode. progress_bar_enabled = DataContext.get_current().enable_progress_bars and ( is_all_to_all or verbose_progress ) - total_estimated_rows = self.op.estimated_output_num_rows or self.op.num_outputs_total() self.progress_bar = ProgressBar( "- " + self.op.name, - total_estimated_rows, + self.op.estimated_output_num_rows(), unit="row", position=index, enabled=progress_bar_enabled, ) num_progress_bars = 1 if is_all_to_all: + # Initialize must be called for sub progress bars, even the + # bars are not enabled via the DataContext. num_progress_bars += self.op.initialize_sub_progress_bars(index + 1) return num_progress_bars if progress_bar_enabled else 0 From 1f5ddace4ff9a9deffa5c56bed5d20704dd10e14 Mon Sep 17 00:00:00 2001 From: zhilong Date: Fri, 19 Jul 2024 11:21:29 -0400 Subject: [PATCH 03/13] fix Signed-off-by: zhilong --- python/ray/data/_internal/execution/operators/map_operator.py | 3 +-- python/ray/data/_internal/execution/streaming_executor.py | 1 + .../ray/data/_internal/execution/streaming_executor_state.py | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 73dfff70b1be..c85c5954438e 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -340,7 +340,7 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]): self._output_queue.notify_task_completed(task_index) if task_done_callback: task_done_callback() - + self._data_tasks[task_index] = DataOpTask( task_index, gen, @@ -348,7 +348,6 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]): functools.partial(_task_done_callback, task_index), ) - def _submit_metadata_task( self, result_ref: ObjectRef, task_done_callback: Callable[[], None] ): diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index e29fc53a39d3..edf236e72c53 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -107,6 +107,7 @@ def execute( message += f" Full logs are in {log_path}" logger.info(message) logger.info(f"Execution plan of Dataset: {dag}") + logger.debug("Execution config: %s", self._options) # Setup the streaming DAG topology and start the runner thread. diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 65dbd9868caa..7ec7a9d25df0 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -222,7 +222,6 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: num_progress_bars += self.op.initialize_sub_progress_bars(index + 1) return num_progress_bars if progress_bar_enabled else 0 - def close_progress_bars(self): """Close all progress bars for this operator.""" if self.progress_bar: From 6b5857c121627803d6795156cb44d5570d5b6b65 Mon Sep 17 00:00:00 2001 From: zhilong Date: Fri, 19 Jul 2024 11:28:32 -0400 Subject: [PATCH 04/13] fix Signed-off-by: zhilong --- .../_internal/execution/operators/map_operator.py | 12 ++++++++---- .../data/_internal/execution/streaming_executor.py | 8 ++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index c85c5954438e..5420add1585c 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -330,9 +330,13 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]): / self._metrics.num_tasks_finished ) # Estimate the number of output rows based on completed tasks - total_output_rows = sum(bundle.num_rows() for bundle in self._output_metadata) + total_output_rows = sum( + bundle.num_rows() for bundle in self._output_metadata + ) self._estimated_output_num_rows = round( - total_output_rows / self._metrics.num_tasks_finished * estimated_num_tasks + total_output_rows + / self._metrics.num_tasks_finished + * estimated_num_tasks ) self._data_tasks.pop(task_index) @@ -431,10 +435,10 @@ def num_active_tasks(self) -> int: # 2. The number of active tasks in the progress bar will be more accurate # to reflect the actual data processing tasks. return len(self._data_tasks) - + @property def estimated_output_num_rows(self) -> Optional[int]: - return getattr(self, '_estimated_output_num_rows', None) + return getattr(self, "_estimated_output_num_rows", None) def _map_task( diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index edf236e72c53..d42bdcd9a8db 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -107,7 +107,7 @@ def execute( message += f" Full logs are in {log_path}" logger.info(message) logger.info(f"Execution plan of Dataset: {dag}") - + logger.debug("Execution config: %s", self._options) # Setup the streaming DAG topology and start the runner thread. @@ -124,10 +124,10 @@ def execute( if not isinstance(dag, InputDataBuffer): # Note: DAG must be initialized in order to query num_outputs_total. - total_estimated_rows = dag.estimated_output_num_rows or dag.num_outputs_total() - self._global_info = ProgressBar( - "Running", total_estimated_rows, unit="row" + total_estimated_rows = ( + dag.estimated_output_num_rows or dag.num_outputs_total() ) + self._global_info = ProgressBar("Running", total_estimated_rows, unit="row") self._output_node: OpState = self._topology[dag] StatsManager.register_dataset_to_stats_actor( From 1501a6c03adb1bcb8b8f15a39a8c961aebce292d Mon Sep 17 00:00:00 2001 From: zhilong Date: Fri, 19 Jul 2024 11:44:48 -0400 Subject: [PATCH 05/13] fix Signed-off-by: zhilong --- python/ray/data/_internal/execution/operators/map_operator.py | 2 +- python/ray/data/_internal/execution/streaming_executor_state.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 5420add1585c..8144e08fd6c6 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -438,7 +438,7 @@ def num_active_tasks(self) -> int: @property def estimated_output_num_rows(self) -> Optional[int]: - return getattr(self, "_estimated_output_num_rows", None) + return getattr(self, "_estimated_output_num_rows", 0) def _map_task( diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 7ec7a9d25df0..ec65170c4c90 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -210,7 +210,7 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: ) self.progress_bar = ProgressBar( "- " + self.op.name, - self.op.estimated_output_num_rows(), + self.op.estimated_output_num_rows, unit="row", position=index, enabled=progress_bar_enabled, From 6e5a7844eb131011a0e8c7ea8d47db8e2d521fc2 Mon Sep 17 00:00:00 2001 From: zhilong Date: Fri, 19 Jul 2024 12:51:15 -0400 Subject: [PATCH 06/13] fix Signed-off-by: zhilong --- .../execution/interfaces/physical_operator.py | 5 +++++ .../data/_internal/execution/operators/map_operator.py | 10 +++------- .../ray/data/_internal/execution/streaming_executor.py | 5 ++--- .../_internal/execution/streaming_executor_state.py | 3 ++- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index 47052aec0cde..f69942a17cbd 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -459,3 +459,8 @@ def implements_accurate_memory_accounting(self) -> bool: def supports_fusion(self) -> bool: """Returns ```True``` if this operator can be fused with other operators.""" return False + + @property + def estimated_output_num_rows(self) -> Optional[int]: + """Return the estimated number of output rows for this operator.""" + return None diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 8144e08fd6c6..93d968e3dd67 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -329,14 +329,10 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]): * self._metrics.num_outputs_of_finished_tasks / self._metrics.num_tasks_finished ) - # Estimate the number of output rows based on completed tasks - total_output_rows = sum( - bundle.num_rows() for bundle in self._output_metadata - ) - self._estimated_output_num_rows = round( - total_output_rows + self._estimated_num_output_rows = round( + estimated_num_tasks + * self._metrics.rows_task_outputs_generated / self._metrics.num_tasks_finished - * estimated_num_tasks ) self._data_tasks.pop(task_index) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index d42bdcd9a8db..86fad5b752f6 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -124,10 +124,9 @@ def execute( if not isinstance(dag, InputDataBuffer): # Note: DAG must be initialized in order to query num_outputs_total. - total_estimated_rows = ( - dag.estimated_output_num_rows or dag.num_outputs_total() + self._global_info = ProgressBar( + "Running", dag.estimated_output_num_rows, unit="row" ) - self._global_info = ProgressBar("Running", total_estimated_rows, unit="row") self._output_node: OpState = self._topology[dag] StatsManager.register_dataset_to_stats_actor( diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index ec65170c4c90..c1c540d51b79 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -242,7 +242,8 @@ def add_output(self, ref: RefBundle) -> None: self.outqueue.append(ref) self.num_completed_tasks += 1 if self.progress_bar: - self.progress_bar.update(1, self.op.num_outputs_total()) + num_rows = sum(meta.num_rows for _, meta in ref.blocks) + self.progress_bar.update(num_rows, self.op.estimated_output_num_rows) def refresh_progress_bar(self, resource_manager: ResourceManager) -> None: """Update the console with the latest operator progress.""" From d4fc58ca49d6cf3b7e9f497d0b42f5c28c0cd1fa Mon Sep 17 00:00:00 2001 From: zhilong Date: Fri, 19 Jul 2024 13:21:23 -0400 Subject: [PATCH 07/13] fix Signed-off-by: zhilong --- python/ray/data/_internal/execution/operators/map_operator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 93d968e3dd67..078de43126e3 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -316,7 +316,8 @@ def _output_ready_callback(task_index, output: RefBundle): def _task_done_callback(task_index: int, exception: Optional[Exception]): self._metrics.on_task_finished(task_index, exception) - # Estimate number of tasks and rows from inputs received and tasks submitted so far + # Estimate number of tasks and rows from inputs received and tasks + # submitted so far upstream_op_num_outputs = self.input_dependencies[0].num_outputs_total() if upstream_op_num_outputs: estimated_num_tasks = ( From 7d3e049407e978b6a60b199ce69c81343e7f1e5c Mon Sep 17 00:00:00 2001 From: zhilong Date: Tue, 23 Jul 2024 17:05:20 -0400 Subject: [PATCH 08/13] fix Signed-off-by: zhilong --- .../execution/interfaces/physical_operator.py | 21 +++++++++++++------ .../execution/operators/limit_operator.py | 8 +++++++ .../execution/operators/map_operator.py | 4 ---- .../execution/operators/output_splitter.py | 5 +++++ .../execution/operators/union_operator.py | 9 ++++++++ .../execution/operators/zip_operator.py | 11 ++++++++++ .../_internal/execution/streaming_executor.py | 4 +++- .../execution/streaming_executor_state.py | 5 ++--- python/ray/data/_internal/progress_bar.py | 1 - 9 files changed, 53 insertions(+), 15 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index f69942a17cbd..cba0b4e1ebf0 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -185,6 +185,7 @@ def __init__( self._in_task_submission_backpressure = False self._metrics = OpRuntimeMetrics(self) self._estimated_num_output_bundles = None + self._estimated_output_num_rows = None self._execution_completed = False def __reduce__(self): @@ -275,6 +276,19 @@ def num_outputs_total(self) -> Optional[int]: ``self._estimated_num_output_bundles`` appropriately. """ return self._estimated_num_output_bundles + + def num_output_rows_total(self) -> Optional[int]: + """Returns the total number of output rows of this operator, + or ``None`` if unable to provide a reasonable estimate (for example, + if no tasks have finished yet). + + The value returned may be an estimate based off the consumption so far. + This is useful for reporting progress. + + Subclasses should either override this method, or update + ``self._estimated_output_num_rows`` appropriately. + """ + return self._estimated_output_num_rows def start(self, options: ExecutionOptions) -> None: """Called by the executor when execution starts for an operator. @@ -458,9 +472,4 @@ def implements_accurate_memory_accounting(self) -> bool: def supports_fusion(self) -> bool: """Returns ```True``` if this operator can be fused with other operators.""" - return False - - @property - def estimated_output_num_rows(self) -> Optional[int]: - """Return the estimated number of output rows for this operator.""" - return None + return False \ No newline at end of file diff --git a/python/ray/data/_internal/execution/operators/limit_operator.py b/python/ray/data/_internal/execution/operators/limit_operator.py index ef0ca8c5b26d..47d1079613a5 100644 --- a/python/ray/data/_internal/execution/operators/limit_operator.py +++ b/python/ray/data/_internal/execution/operators/limit_operator.py @@ -115,6 +115,14 @@ def num_outputs_total(self) -> Optional[int]: if self._execution_completed: return self._cur_output_bundles return self._estimated_num_output_bundles + + def num_output_rows_total(self) -> Optional[int]: + # The total number of rows is simply the limit or the number + # of input rows, whichever is smaller + input_num_rows = self.input_dependencies[0].num_output_rows_total() + if input_num_rows is None: + return None + return min(self._limit, input_num_rows) def throttling_disabled(self) -> bool: return True diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 078de43126e3..c665d035a1c5 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -433,10 +433,6 @@ def num_active_tasks(self) -> int: # to reflect the actual data processing tasks. return len(self._data_tasks) - @property - def estimated_output_num_rows(self) -> Optional[int]: - return getattr(self, "_estimated_output_num_rows", 0) - def _map_task( map_transformer: MapTransformer, diff --git a/python/ray/data/_internal/execution/operators/output_splitter.py b/python/ray/data/_internal/execution/operators/output_splitter.py index 38517a9e8633..69eed4ef2eaf 100644 --- a/python/ray/data/_internal/execution/operators/output_splitter.py +++ b/python/ray/data/_internal/execution/operators/output_splitter.py @@ -73,6 +73,11 @@ def num_outputs_total(self) -> Optional[int]: # OutputSplitter does not change the number of blocks, # so we can return the number of blocks from the input op. return self.input_dependencies[0].num_outputs_total() + + def num_output_rows_total(self) -> Optional[int]: + # The total number of rows is the same as the number of input rows. + return self.input_dependencies[0].num_output_rows_total() + def start(self, options: ExecutionOptions) -> None: super().start(options) diff --git a/python/ray/data/_internal/execution/operators/union_operator.py b/python/ray/data/_internal/execution/operators/union_operator.py index 4a2a73d7817a..815295070c16 100644 --- a/python/ray/data/_internal/execution/operators/union_operator.py +++ b/python/ray/data/_internal/execution/operators/union_operator.py @@ -55,6 +55,15 @@ def num_outputs_total(self) -> Optional[int]: num_outputs += input_num_outputs return num_outputs + def num_output_rows_total(self) -> Optional[int]: + total_rows = 0 + for input_op in self.input_dependencies: + input_num_rows = input_op.num_output_rows_total() + if input_num_rows is None: + return None + total_rows += input_num_rows + return total_rows + def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: assert not self.completed() assert 0 <= input_index <= len(self._input_dependencies), input_index diff --git a/python/ray/data/_internal/execution/operators/zip_operator.py b/python/ray/data/_internal/execution/operators/zip_operator.py index da54874e6281..df8d90dfd291 100644 --- a/python/ray/data/_internal/execution/operators/zip_operator.py +++ b/python/ray/data/_internal/execution/operators/zip_operator.py @@ -52,6 +52,17 @@ def num_outputs_total(self) -> Optional[int]: return left_num_outputs else: return right_num_outputs + + def num_output_rows_total(self) -> Optional[int]: + left_num_rows = self.input_dependencies[0].num_output_rows_total() + right_num_rows = self.input_dependencies[1].num_output_rows_total() + if left_num_rows is not None and right_num_rows is not None: + return max(left_num_rows, right_num_rows) + elif left_num_rows is not None: + return left_num_rows + else: + return right_num_rows + def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: assert not self.completed() diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 86fad5b752f6..57b516fac6c9 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -124,8 +124,10 @@ def execute( if not isinstance(dag, InputDataBuffer): # Note: DAG must be initialized in order to query num_outputs_total. + # TODO(zhilong): Implement num_output_rows_total for all + # AllToAllOperators self._global_info = ProgressBar( - "Running", dag.estimated_output_num_rows, unit="row" + "Running", dag.num_output_rows_total(), unit="row" ) self._output_node: OpState = self._topology[dag] diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index c1c540d51b79..ff3e67fe42fd 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -210,7 +210,7 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int: ) self.progress_bar = ProgressBar( "- " + self.op.name, - self.op.estimated_output_num_rows, + self.op.num_output_rows_total(), unit="row", position=index, enabled=progress_bar_enabled, @@ -242,8 +242,7 @@ def add_output(self, ref: RefBundle) -> None: self.outqueue.append(ref) self.num_completed_tasks += 1 if self.progress_bar: - num_rows = sum(meta.num_rows for _, meta in ref.blocks) - self.progress_bar.update(num_rows, self.op.estimated_output_num_rows) + self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total()) def refresh_progress_bar(self, resource_manager: ResourceManager) -> None: """Update the console with the latest operator progress.""" diff --git a/python/ray/data/_internal/progress_bar.py b/python/ray/data/_internal/progress_bar.py index e117dc1be481..5e35965d0b1f 100644 --- a/python/ray/data/_internal/progress_bar.py +++ b/python/ray/data/_internal/progress_bar.py @@ -18,7 +18,6 @@ _canceled_threads = set() _canceled_threads_lock = threading.Lock() - @Deprecated def set_progress_bars(enabled: bool) -> bool: """Set whether progress bars are enabled. From 1c8a61831df227405532e0a7ebaee7162d95669a Mon Sep 17 00:00:00 2001 From: zhilong Date: Wed, 24 Jul 2024 15:49:49 -0400 Subject: [PATCH 09/13] fix Signed-off-by: zhilong --- .../execution/interfaces/physical_operator.py | 4 ++-- .../execution/operators/limit_operator.py | 4 ++-- .../execution/operators/output_splitter.py | 3 +-- .../execution/operators/zip_operator.py | 3 +-- python/ray/data/_internal/progress_bar.py | 19 +++++++++++++++---- python/ray/experimental/tqdm_ray.py | 14 ++++++++++++-- 6 files changed, 33 insertions(+), 14 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index cba0b4e1ebf0..2753ca63be7f 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -276,7 +276,7 @@ def num_outputs_total(self) -> Optional[int]: ``self._estimated_num_output_bundles`` appropriately. """ return self._estimated_num_output_bundles - + def num_output_rows_total(self) -> Optional[int]: """Returns the total number of output rows of this operator, or ``None`` if unable to provide a reasonable estimate (for example, @@ -472,4 +472,4 @@ def implements_accurate_memory_accounting(self) -> bool: def supports_fusion(self) -> bool: """Returns ```True``` if this operator can be fused with other operators.""" - return False \ No newline at end of file + return False diff --git a/python/ray/data/_internal/execution/operators/limit_operator.py b/python/ray/data/_internal/execution/operators/limit_operator.py index 47d1079613a5..a557762ec69a 100644 --- a/python/ray/data/_internal/execution/operators/limit_operator.py +++ b/python/ray/data/_internal/execution/operators/limit_operator.py @@ -115,9 +115,9 @@ def num_outputs_total(self) -> Optional[int]: if self._execution_completed: return self._cur_output_bundles return self._estimated_num_output_bundles - + def num_output_rows_total(self) -> Optional[int]: - # The total number of rows is simply the limit or the number + # The total number of rows is simply the limit or the number # of input rows, whichever is smaller input_num_rows = self.input_dependencies[0].num_output_rows_total() if input_num_rows is None: diff --git a/python/ray/data/_internal/execution/operators/output_splitter.py b/python/ray/data/_internal/execution/operators/output_splitter.py index 69eed4ef2eaf..f5a9b6c55d84 100644 --- a/python/ray/data/_internal/execution/operators/output_splitter.py +++ b/python/ray/data/_internal/execution/operators/output_splitter.py @@ -73,12 +73,11 @@ def num_outputs_total(self) -> Optional[int]: # OutputSplitter does not change the number of blocks, # so we can return the number of blocks from the input op. return self.input_dependencies[0].num_outputs_total() - + def num_output_rows_total(self) -> Optional[int]: # The total number of rows is the same as the number of input rows. return self.input_dependencies[0].num_output_rows_total() - def start(self, options: ExecutionOptions) -> None: super().start(options) # Force disable locality optimization. diff --git a/python/ray/data/_internal/execution/operators/zip_operator.py b/python/ray/data/_internal/execution/operators/zip_operator.py index df8d90dfd291..ed1abf31a5fa 100644 --- a/python/ray/data/_internal/execution/operators/zip_operator.py +++ b/python/ray/data/_internal/execution/operators/zip_operator.py @@ -52,7 +52,7 @@ def num_outputs_total(self) -> Optional[int]: return left_num_outputs else: return right_num_outputs - + def num_output_rows_total(self) -> Optional[int]: left_num_rows = self.input_dependencies[0].num_output_rows_total() right_num_rows = self.input_dependencies[1].num_output_rows_total() @@ -63,7 +63,6 @@ def num_output_rows_total(self) -> Optional[int]: else: return right_num_rows - def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: assert not self.completed() assert input_index == 0 or input_index == 1, input_index diff --git a/python/ray/data/_internal/progress_bar.py b/python/ray/data/_internal/progress_bar.py index 5e35965d0b1f..b33aba5781df 100644 --- a/python/ray/data/_internal/progress_bar.py +++ b/python/ray/data/_internal/progress_bar.py @@ -3,6 +3,7 @@ import ray from ray.experimental import tqdm_ray +from ray.experimental.tqdm_ray import format_num from ray.types import ObjectRef from ray.util.annotations import Deprecated @@ -18,6 +19,7 @@ _canceled_threads = set() _canceled_threads_lock = threading.Lock() + @Deprecated def set_progress_bars(enabled: bool) -> bool: """Set whether progress bars are enabled. @@ -118,16 +120,25 @@ def fetch_until_complete(self, refs: List[ObjectRef]) -> List[Any]: def set_description(self, name: str) -> None: if self._bar and name != self._desc: self._desc = name - self._bar.set_description(self._desc) + formatted_progress = format_num(self._progress) + formatted_total = ( + format_num(self._bar.total) if self._bar.total is not None else "??" + ) + self._bar.set_description( + f"{self._desc} {formatted_progress}/{formatted_total}" + ) def update(self, i: int = 0, total: Optional[int] = None) -> None: if self._bar and (i != 0 or self._bar.total != total): self._progress += i if total is not None: self._bar.total = total - if self._bar.total is not None and self._progress > self._bar.total: - # If the progress goes over 100%, update the total. - self._bar.total = self._progress + formatted_total = format_num(self._bar.total) + formatted_progress = format_num(self._progress) + self._bar.set_description( + f"{self._desc} {formatted_progress} \ + /{formatted_total if total is not None else '??'}" + ) self._bar.update(i) def close(self): diff --git a/python/ray/experimental/tqdm_ray.py b/python/ray/experimental/tqdm_ray.py index 8015a8fa5aed..32c8a155ef66 100644 --- a/python/ray/experimental/tqdm_ray.py +++ b/python/ray/experimental/tqdm_ray.py @@ -51,6 +51,13 @@ def safe_print(*args, **kwargs): instance().unhide_bars() +def format_num(n): + """Intelligent scientific notation (.3g).""" + f = f"{n:.3g}".replace("e+0", "e+").replace("e-0", "e-") + n = str(n) + return f if len(f) < len(n) else n + + class tqdm: """Experimental: Ray distributed tqdm implementation. @@ -99,7 +106,8 @@ def __init__( def set_description(self, desc): """Implements tqdm.tqdm.set_description.""" - self._desc = desc + self._desc = f"{desc} ({format_num(self._x)}\ + /{format_num(self._total) if self._total else '??'})" self._dump_state() def update(self, n=1): @@ -139,11 +147,13 @@ def _dump_state(self, force_flush=False) -> None: instance().process_state_update(copy.deepcopy(self._get_state())) def _get_state(self) -> ProgressBarState: + """Get the formatted state of the progress bar.""" return { "__magic_token__": RAY_TQDM_MAGIC, "x": self._x, "pos": self._pos, - "desc": self._desc, + "desc": f"{self._desc} {format_num(self._x)}\ + /{format_num(self._total) if self._total else '??'}", "total": self._total, "unit": self._unit, "ip": self._ip, From b57fea4cc431cfd0bc9d7acbd48241a01a486475 Mon Sep 17 00:00:00 2001 From: zhilong Date: Thu, 25 Jul 2024 15:45:41 -0400 Subject: [PATCH 10/13] fix default Signed-off-by: zhilong --- python/ray/data/_internal/progress_bar.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/progress_bar.py b/python/ray/data/_internal/progress_bar.py index b33aba5781df..ebe1543b2e8a 100644 --- a/python/ray/data/_internal/progress_bar.py +++ b/python/ray/data/_internal/progress_bar.py @@ -71,7 +71,7 @@ def __init__( self._bar = tqdm_ray.tqdm(total=total, unit=unit, position=position) else: self._bar = tqdm.tqdm( - total=total, + total=total or 0, # Fix currupt if use default position=position, dynamic_ncols=True, unit=unit, From 190e38ee472b561c5e77e9b278a35d7116e8911b Mon Sep 17 00:00:00 2001 From: zhilong Date: Thu, 25 Jul 2024 16:39:08 -0400 Subject: [PATCH 11/13] rever changed for progress bar Signed-off-by: zhilong --- python/ray/data/_internal/progress_bar.py | 22 ++++++---------------- python/ray/experimental/tqdm_ray.py | 16 +++------------- 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/python/ray/data/_internal/progress_bar.py b/python/ray/data/_internal/progress_bar.py index ebe1543b2e8a..63c087c6172a 100644 --- a/python/ray/data/_internal/progress_bar.py +++ b/python/ray/data/_internal/progress_bar.py @@ -3,7 +3,6 @@ import ray from ray.experimental import tqdm_ray -from ray.experimental.tqdm_ray import format_num from ray.types import ObjectRef from ray.util.annotations import Deprecated @@ -71,7 +70,7 @@ def __init__( self._bar = tqdm_ray.tqdm(total=total, unit=unit, position=position) else: self._bar = tqdm.tqdm( - total=total or 0, # Fix currupt if use default + total=total, position=position, dynamic_ncols=True, unit=unit, @@ -120,25 +119,16 @@ def fetch_until_complete(self, refs: List[ObjectRef]) -> List[Any]: def set_description(self, name: str) -> None: if self._bar and name != self._desc: self._desc = name - formatted_progress = format_num(self._progress) - formatted_total = ( - format_num(self._bar.total) if self._bar.total is not None else "??" - ) - self._bar.set_description( - f"{self._desc} {formatted_progress}/{formatted_total}" - ) + self._bar.set_description(self._desc) def update(self, i: int = 0, total: Optional[int] = None) -> None: if self._bar and (i != 0 or self._bar.total != total): self._progress += i if total is not None: self._bar.total = total - formatted_total = format_num(self._bar.total) - formatted_progress = format_num(self._progress) - self._bar.set_description( - f"{self._desc} {formatted_progress} \ - /{formatted_total if total is not None else '??'}" - ) + if self._bar.total is not None and self._progress > self._bar.total: + # If the progress goes over 100%, update the total. + self._bar.total = self._progress self._bar.update(i) def close(self): @@ -157,4 +147,4 @@ def __getstate__(self): return {} def __setstate__(self, state): - self._bar = None # Progress bar is disabled on remote nodes. + self._bar = None # Progress bar is disabled on remote nodes. \ No newline at end of file diff --git a/python/ray/experimental/tqdm_ray.py b/python/ray/experimental/tqdm_ray.py index 32c8a155ef66..bbbef68719a3 100644 --- a/python/ray/experimental/tqdm_ray.py +++ b/python/ray/experimental/tqdm_ray.py @@ -51,13 +51,6 @@ def safe_print(*args, **kwargs): instance().unhide_bars() -def format_num(n): - """Intelligent scientific notation (.3g).""" - f = f"{n:.3g}".replace("e+0", "e+").replace("e-0", "e-") - n = str(n) - return f if len(f) < len(n) else n - - class tqdm: """Experimental: Ray distributed tqdm implementation. @@ -106,8 +99,7 @@ def __init__( def set_description(self, desc): """Implements tqdm.tqdm.set_description.""" - self._desc = f"{desc} ({format_num(self._x)}\ - /{format_num(self._total) if self._total else '??'})" + self._desc = desc self._dump_state() def update(self, n=1): @@ -147,13 +139,11 @@ def _dump_state(self, force_flush=False) -> None: instance().process_state_update(copy.deepcopy(self._get_state())) def _get_state(self) -> ProgressBarState: - """Get the formatted state of the progress bar.""" return { "__magic_token__": RAY_TQDM_MAGIC, "x": self._x, "pos": self._pos, - "desc": f"{self._desc} {format_num(self._x)}\ - /{format_num(self._total) if self._total else '??'}", + "desc": self._desc, "total": self._total, "unit": self._unit, "ip": self._ip, @@ -415,4 +405,4 @@ def sleep(x): processing.remote(0.01), processing.remote(0.05), ] - ) + ) \ No newline at end of file From bcafb6ea53c07f51886503344a86a738114f80b5 Mon Sep 17 00:00:00 2001 From: zhilong Date: Thu, 25 Jul 2024 16:40:04 -0400 Subject: [PATCH 12/13] rever changed for progress bar Signed-off-by: zhilong --- python/ray/data/_internal/progress_bar.py | 2 +- python/ray/experimental/tqdm_ray.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/progress_bar.py b/python/ray/data/_internal/progress_bar.py index 63c087c6172a..e117dc1be481 100644 --- a/python/ray/data/_internal/progress_bar.py +++ b/python/ray/data/_internal/progress_bar.py @@ -147,4 +147,4 @@ def __getstate__(self): return {} def __setstate__(self, state): - self._bar = None # Progress bar is disabled on remote nodes. \ No newline at end of file + self._bar = None # Progress bar is disabled on remote nodes. diff --git a/python/ray/experimental/tqdm_ray.py b/python/ray/experimental/tqdm_ray.py index bbbef68719a3..8015a8fa5aed 100644 --- a/python/ray/experimental/tqdm_ray.py +++ b/python/ray/experimental/tqdm_ray.py @@ -405,4 +405,4 @@ def sleep(x): processing.remote(0.01), processing.remote(0.05), ] - ) \ No newline at end of file + ) From 7ffe6650e8a3120076c90bb668e017d930acb408 Mon Sep 17 00:00:00 2001 From: zhilong Date: Thu, 1 Aug 2024 15:46:37 -0400 Subject: [PATCH 13/13] add assert Signed-off-by: zhilong --- .../ray/data/_internal/execution/streaming_executor_state.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 38800fa4b0d8..fff7c48d3bd1 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -245,6 +245,9 @@ def add_output(self, ref: RefBundle) -> None: self.outqueue.append(ref) self.num_completed_tasks += 1 if self.progress_bar: + assert ( + ref.num_rows() is not None + ), "RefBundle must have a valid number of rows" self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total()) def refresh_progress_bar(self, resource_manager: ResourceManager) -> None: