From 962fac1780eff87cdfdb34123c186e59aae52278 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Fri, 15 Nov 2024 14:06:02 -0800 Subject: [PATCH] [Data] [Dashboard] Fix `Rows Outputted` being zero on Ray Data Dashboard (#48745) ## Why are these changes needed? Currently, there are some cases where the `Rows Outputted` value on the Ray Job page's `Ray Data Overview` section says "0", even after the dataset execution completes. The root cause of the bug is that we clear iteration/execution metrics after the dataset completes. This was previously used to "reset" the metrics to 0 after dataset completion, so that the last emitted value would not persist on the dashboard, even after the job finishes. Now that we display rates on the dashboard, this hack is no longer needed, and we can skip the metrics clearing. Fixed result: Screenshot at Nov 14 12-11-24 ## Related issue number Closes https://github.com/ray-project/ray/issues/44635 ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Scott Lee Signed-off-by: hjiang --- .../_internal/execution/streaming_executor.py | 8 ++-- python/ray/data/_internal/stats.py | 39 ++----------------- python/ray/data/tests/test_stats.py | 9 ++--- 3 files changed, 10 insertions(+), 46 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 238f6f9421cc..ca48d7766c35 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -188,11 +188,9 @@ def shutdown(self, execution_completed: bool = True): state="FINISHED" if execution_completed else "FAILED", force_update=True, ) - # Clears metrics for this dataset so that they do - # not persist in the grafana dashboard after execution - StatsManager.clear_execution_metrics( - self._dataset_tag, self._get_operator_tags() - ) + # Once Dataset execution completes, mark it as complete + # and remove last cached execution stats. + StatsManager.clear_last_execution_stats(self._dataset_tag) # Freeze the stats and save it. self._final_stats = self._generate_stats() stats_summary_string = self._final_stats.to_summary().to_string( diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 46435ec9ceb4..fc6903cd92e2 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -378,33 +378,6 @@ def update_iteration_metrics( self.iter_user_s.set(stats.iter_user_s.get(), tags) self.iter_initialize_s.set(stats.iter_initialize_s.get(), tags) - def clear_execution_metrics(self, dataset_tag: str, operator_tags: List[str]): - for operator_tag in operator_tags: - tags = self._create_tags(dataset_tag, operator_tag) - self.spilled_bytes.set(0, tags) - self.allocated_bytes.set(0, tags) - self.freed_bytes.set(0, tags) - self.current_bytes.set(0, tags) - self.output_bytes.set(0, tags) - self.output_rows.set(0, tags) - self.cpu_usage_cores.set(0, tags) - self.gpu_usage_cores.set(0, tags) - - for prom_metric in self.execution_metrics_inputs.values(): - prom_metric.set(0, tags) - - for prom_metric in self.execution_metrics_outputs.values(): - prom_metric.set(0, tags) - - for prom_metric in self.execution_metrics_tasks.values(): - prom_metric.set(0, tags) - - for prom_metric in self.execution_metrics_obj_store_memory.values(): - prom_metric.set(0, tags) - - for prom_metric in self.execution_metrics_misc.values(): - prom_metric.set(0, tags) - def register_dataset(self, job_id: str, dataset_tag: str, operator_tags: List[str]): self.datasets[dataset_tag] = { "job_id": job_id, @@ -593,19 +566,13 @@ def update_execution_metrics( self._last_execution_stats[dataset_tag] = args self._start_thread_if_not_running() - def clear_execution_metrics(self, dataset_tag: str, operator_tags: List[str]): + def clear_last_execution_stats(self, dataset_tag: str): + # After dataset completes execution, remove cached execution stats. + # Marks the dataset as finished on job page's Ray Data Overview. with self._stats_lock: if dataset_tag in self._last_execution_stats: del self._last_execution_stats[dataset_tag] - try: - self._stats_actor( - create_if_not_exists=False - ).clear_execution_metrics.remote(dataset_tag, operator_tags) - except Exception: - # Cluster may be shut down. - pass - # Iteration methods def update_iteration_metrics(self, stats: "DatasetStats", dataset_tag: str): diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index d8d85515092c..cb0d31f22774 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -1663,8 +1663,9 @@ def test_stats_manager(shutdown_only): datasets = [None] * num_threads # Mock clear methods so that _last_execution_stats and _last_iteration_stats # are not cleared. We will assert on them afterwards. - with patch.object(StatsManager, "clear_execution_metrics"), patch.object( - StatsManager, "clear_iteration_metrics" + with ( + patch.object(StatsManager, "clear_last_execution_stats"), + patch.object(StatsManager, "clear_iteration_metrics"), ): def update_stats_manager(i): @@ -1689,9 +1690,7 @@ def update_stats_manager(i): dataset_tag = create_dataset_tag(dataset._name, dataset._uuid) assert dataset_tag in StatsManager._last_execution_stats assert dataset_tag in StatsManager._last_iteration_stats - StatsManager.clear_execution_metrics( - dataset_tag, ["Input0", "ReadRange->MapBatches()1"] - ) + StatsManager.clear_last_execution_stats(dataset_tag) StatsManager.clear_iteration_metrics(dataset_tag) wait_for_condition(lambda: not StatsManager._update_thread.is_alive())