Skip to content

Commit

Permalink
[Data] [Dashboard] Fix Rows Outputted being zero on Ray Data Dashbo…
Browse files Browse the repository at this point in the history
…ard (#48745)

## Why are these changes needed?

Currently, there are some cases where the `Rows Outputted` value on the
Ray Job page's `Ray Data Overview` section says "0", even after the
dataset execution completes. The root cause of the bug is that we clear
iteration/execution metrics after the dataset completes. This was
previously used to "reset" the metrics to 0 after dataset completion, so
that the last emitted value would not persist on the dashboard, even
after the job finishes. Now that we display rates on the dashboard, this
hack is no longer needed, and we can skip the metrics clearing.

Fixed result:
<img width="1860" alt="Screenshot at Nov 14 12-11-24"
src="https://github.com/user-attachments/assets/35061b3f-9359-412b-8ab2-f4bcce412994">

## Related issue number

Closes #44635

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Scott Lee <sjl@anyscale.com>
  • Loading branch information
scottjlee authored Nov 15, 2024
1 parent 196a0ca commit 351d780
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 46 deletions.
8 changes: 3 additions & 5 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,9 @@ def shutdown(self, execution_completed: bool = True):
state="FINISHED" if execution_completed else "FAILED",
force_update=True,
)
# Clears metrics for this dataset so that they do
# not persist in the grafana dashboard after execution
StatsManager.clear_execution_metrics(
self._dataset_tag, self._get_operator_tags()
)
# Once Dataset execution completes, mark it as complete
# and remove last cached execution stats.
StatsManager.clear_last_execution_stats(self._dataset_tag)
# Freeze the stats and save it.
self._final_stats = self._generate_stats()
stats_summary_string = self._final_stats.to_summary().to_string(
Expand Down
39 changes: 3 additions & 36 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,33 +378,6 @@ def update_iteration_metrics(
self.iter_user_s.set(stats.iter_user_s.get(), tags)
self.iter_initialize_s.set(stats.iter_initialize_s.get(), tags)

def clear_execution_metrics(self, dataset_tag: str, operator_tags: List[str]):
for operator_tag in operator_tags:
tags = self._create_tags(dataset_tag, operator_tag)
self.spilled_bytes.set(0, tags)
self.allocated_bytes.set(0, tags)
self.freed_bytes.set(0, tags)
self.current_bytes.set(0, tags)
self.output_bytes.set(0, tags)
self.output_rows.set(0, tags)
self.cpu_usage_cores.set(0, tags)
self.gpu_usage_cores.set(0, tags)

for prom_metric in self.execution_metrics_inputs.values():
prom_metric.set(0, tags)

for prom_metric in self.execution_metrics_outputs.values():
prom_metric.set(0, tags)

for prom_metric in self.execution_metrics_tasks.values():
prom_metric.set(0, tags)

for prom_metric in self.execution_metrics_obj_store_memory.values():
prom_metric.set(0, tags)

for prom_metric in self.execution_metrics_misc.values():
prom_metric.set(0, tags)

def register_dataset(self, job_id: str, dataset_tag: str, operator_tags: List[str]):
self.datasets[dataset_tag] = {
"job_id": job_id,
Expand Down Expand Up @@ -593,19 +566,13 @@ def update_execution_metrics(
self._last_execution_stats[dataset_tag] = args
self._start_thread_if_not_running()

def clear_execution_metrics(self, dataset_tag: str, operator_tags: List[str]):
def clear_last_execution_stats(self, dataset_tag: str):
# After dataset completes execution, remove cached execution stats.
# Marks the dataset as finished on job page's Ray Data Overview.
with self._stats_lock:
if dataset_tag in self._last_execution_stats:
del self._last_execution_stats[dataset_tag]

try:
self._stats_actor(
create_if_not_exists=False
).clear_execution_metrics.remote(dataset_tag, operator_tags)
except Exception:
# Cluster may be shut down.
pass

# Iteration methods

def update_iteration_metrics(self, stats: "DatasetStats", dataset_tag: str):
Expand Down
9 changes: 4 additions & 5 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1663,8 +1663,9 @@ def test_stats_manager(shutdown_only):
datasets = [None] * num_threads
# Mock clear methods so that _last_execution_stats and _last_iteration_stats
# are not cleared. We will assert on them afterwards.
with patch.object(StatsManager, "clear_execution_metrics"), patch.object(
StatsManager, "clear_iteration_metrics"
with (
patch.object(StatsManager, "clear_last_execution_stats"),
patch.object(StatsManager, "clear_iteration_metrics"),
):

def update_stats_manager(i):
Expand All @@ -1689,9 +1690,7 @@ def update_stats_manager(i):
dataset_tag = create_dataset_tag(dataset._name, dataset._uuid)
assert dataset_tag in StatsManager._last_execution_stats
assert dataset_tag in StatsManager._last_iteration_stats
StatsManager.clear_execution_metrics(
dataset_tag, ["Input0", "ReadRange->MapBatches(<lambda>)1"]
)
StatsManager.clear_last_execution_stats(dataset_tag)
StatsManager.clear_iteration_metrics(dataset_tag)

wait_for_condition(lambda: not StatsManager._update_thread.is_alive())
Expand Down

0 comments on commit 351d780

Please sign in to comment.