diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index c3d7ae8e95dc..360fcc284919 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -307,8 +307,8 @@ def add_output(self, ref: RefBundle) -> None: self.op.metrics.num_restarting_actors = actor_info.restarting self.op.metrics.num_pending_actors = actor_info.pending for next_op in self.op.output_dependencies: - next_op.metrics.num_external_inqueue_blocks = self.output_queue.num_blocks - next_op.metrics.num_external_inqueue_bytes = self.output_queue.memory_usage + next_op.metrics.num_external_inqueue_blocks += len(ref.blocks) + next_op.metrics.num_external_inqueue_bytes += ref.size_bytes() def refresh_progress_bar(self, resource_manager: ResourceManager) -> None: """Update the console with the latest operator progress.""" @@ -353,6 +353,8 @@ def dispatch_next_task(self) -> None: ref = inqueue.pop() if ref is not None: self.op.add_input(ref, input_index=i) + self.op.metrics.num_external_inqueue_bytes -= ref.size_bytes() + self.op.metrics.num_external_inqueue_blocks -= len(ref.blocks) return assert False, "Nothing to dispatch" diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 07cb5c359fd2..4d0f1613583b 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -103,8 +103,8 @@ def gen_expected_metrics( "'num_outputs_of_finished_tasks': N", "'bytes_outputs_of_finished_tasks': N", "'rows_outputs_of_finished_tasks': N", - "'num_external_inqueue_blocks': N", - "'num_external_inqueue_bytes': N", + "'num_external_inqueue_blocks': Z", + "'num_external_inqueue_bytes': Z", "'num_tasks_submitted': N", "'num_tasks_running': Z", "'num_tasks_have_outputs': N", @@ -166,8 +166,8 @@ def gen_expected_metrics( "'num_outputs_of_finished_tasks': Z", "'bytes_outputs_of_finished_tasks': Z", "'rows_outputs_of_finished_tasks': Z", - "'num_external_inqueue_blocks': N", - "'num_external_inqueue_bytes': N", + "'num_external_inqueue_blocks': Z", + "'num_external_inqueue_bytes': Z", "'num_tasks_submitted': Z", "'num_tasks_running': Z", "'num_tasks_have_outputs': Z", @@ -710,8 +710,8 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): " num_outputs_of_finished_tasks: N,\n" " bytes_outputs_of_finished_tasks: N,\n" " rows_outputs_of_finished_tasks: N,\n" - " num_external_inqueue_blocks: N,\n" - " num_external_inqueue_bytes: N,\n" + " num_external_inqueue_blocks: Z,\n" + " num_external_inqueue_bytes: Z,\n" " num_tasks_submitted: N,\n" " num_tasks_running: Z,\n" " num_tasks_have_outputs: N,\n" @@ -842,8 +842,8 @@ def check_stats(): " num_outputs_of_finished_tasks: N,\n" " bytes_outputs_of_finished_tasks: N,\n" " rows_outputs_of_finished_tasks: N,\n" - " num_external_inqueue_blocks: N,\n" - " num_external_inqueue_bytes: N,\n" + " num_external_inqueue_blocks: Z,\n" + " num_external_inqueue_bytes: Z,\n" " num_tasks_submitted: N,\n" " num_tasks_running: Z,\n" " num_tasks_have_outputs: N,\n" @@ -929,8 +929,8 @@ def check_stats(): " num_outputs_of_finished_tasks: N,\n" " bytes_outputs_of_finished_tasks: N,\n" " rows_outputs_of_finished_tasks: N,\n" - " num_external_inqueue_blocks: N,\n" - " num_external_inqueue_bytes: N,\n" + " num_external_inqueue_blocks: Z,\n" + " num_external_inqueue_bytes: Z,\n" " num_tasks_submitted: N,\n" " num_tasks_running: Z,\n" " num_tasks_have_outputs: N,\n" @@ -1985,12 +1985,6 @@ def test_op_metrics_logging(): + gen_expected_metrics(is_map=False) ) # .replace("'obj_store_mem_used': N", "'obj_store_mem_used': Z") # InputDataBuffer has no inqueue, manually set to 0 - input_str = input_str.replace( - "'num_external_inqueue_blocks': N", "'num_external_inqueue_blocks': Z" - ) - input_str = input_str.replace( - "'num_external_inqueue_bytes': N", "'num_external_inqueue_bytes': Z" - ) map_str = ( "Operator TaskPoolMapOperator[ReadRange->MapBatches()] completed. " "Operator Metrics:\n"