Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] [Dashboard] Fix Rows Outputted being zero on Ray Data Dashboard #48745

Merged
merged 13 commits into from
Nov 15, 2024
8 changes: 3 additions & 5 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,9 @@ def shutdown(self, execution_completed: bool = True):
state="FINISHED" if execution_completed else "FAILED",
force_update=True,
)
# Clears metrics for this dataset so that they do
# not persist in the grafana dashboard after execution
StatsManager.clear_execution_metrics(
self._dataset_tag, self._get_operator_tags()
)
# Once Dataset execution completes, mark it as complete
# and remove last cached execution stats.
StatsManager.clear_last_execution_stats(self._dataset_tag)
# Freeze the stats and save it.
self._final_stats = self._generate_stats()
stats_summary_string = self._final_stats.to_summary().to_string(
Expand Down
39 changes: 3 additions & 36 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,33 +378,6 @@ def update_iteration_metrics(
self.iter_user_s.set(stats.iter_user_s.get(), tags)
self.iter_initialize_s.set(stats.iter_initialize_s.get(), tags)

def clear_execution_metrics(self, dataset_tag: str, operator_tags: List[str]):
for operator_tag in operator_tags:
tags = self._create_tags(dataset_tag, operator_tag)
self.spilled_bytes.set(0, tags)
self.allocated_bytes.set(0, tags)
self.freed_bytes.set(0, tags)
self.current_bytes.set(0, tags)
self.output_bytes.set(0, tags)
self.output_rows.set(0, tags)
self.cpu_usage_cores.set(0, tags)
self.gpu_usage_cores.set(0, tags)

for prom_metric in self.execution_metrics_inputs.values():
prom_metric.set(0, tags)

for prom_metric in self.execution_metrics_outputs.values():
prom_metric.set(0, tags)

for prom_metric in self.execution_metrics_tasks.values():
prom_metric.set(0, tags)

for prom_metric in self.execution_metrics_obj_store_memory.values():
prom_metric.set(0, tags)

for prom_metric in self.execution_metrics_misc.values():
prom_metric.set(0, tags)

def register_dataset(self, job_id: str, dataset_tag: str, operator_tags: List[str]):
self.datasets[dataset_tag] = {
"job_id": job_id,
Expand Down Expand Up @@ -593,19 +566,13 @@ def update_execution_metrics(
self._last_execution_stats[dataset_tag] = args
self._start_thread_if_not_running()

def clear_execution_metrics(self, dataset_tag: str, operator_tags: List[str]):
def clear_last_execution_stats(self, dataset_tag: str):
# After dataset completes execution, remove cached execution stats.
# Marks the dataset as finished on job page's Ray Data Overview.
with self._stats_lock:
if dataset_tag in self._last_execution_stats:
del self._last_execution_stats[dataset_tag]

try:
self._stats_actor(
create_if_not_exists=False
).clear_execution_metrics.remote(dataset_tag, operator_tags)
except Exception:
# Cluster may be shut down.
pass

# Iteration methods

def update_iteration_metrics(self, stats: "DatasetStats", dataset_tag: str):
Expand Down
9 changes: 4 additions & 5 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1663,8 +1663,9 @@ def test_stats_manager(shutdown_only):
datasets = [None] * num_threads
# Mock clear methods so that _last_execution_stats and _last_iteration_stats
# are not cleared. We will assert on them afterwards.
with patch.object(StatsManager, "clear_execution_metrics"), patch.object(
StatsManager, "clear_iteration_metrics"
with (
patch.object(StatsManager, "clear_last_execution_stats"),
patch.object(StatsManager, "clear_iteration_metrics"),
):

def update_stats_manager(i):
Expand All @@ -1689,9 +1690,7 @@ def update_stats_manager(i):
dataset_tag = create_dataset_tag(dataset._name, dataset._uuid)
assert dataset_tag in StatsManager._last_execution_stats
assert dataset_tag in StatsManager._last_iteration_stats
StatsManager.clear_execution_metrics(
dataset_tag, ["Input0", "ReadRange->MapBatches(<lambda>)1"]
)
StatsManager.clear_last_execution_stats(dataset_tag)
StatsManager.clear_iteration_metrics(dataset_tag)

wait_for_condition(lambda: not StatsManager._update_thread.is_alive())
Expand Down