Skip to content

Commit 08e9feb

Browse files
committed
attempt ray-project#1: total progress only
Signed-off-by: Daniel Shin <kyuseung1016@gmail.com>
1 parent 53a5c7b commit 08e9feb

File tree

7 files changed

+343
-198
lines changed

7 files changed

+343
-198
lines changed

python/ray/data/_internal/execution/operators/base_physical_operator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ def get_transformation_fn(self) -> AllToAllTransformFn:
150150
def progress_str(self) -> str:
151151
return f"{self.num_output_rows_total() or 0} rows output"
152152

153+
# TODO (kyuds): remove
153154
def initialize_sub_progress_bars(self, position: int) -> int:
154155
"""Initialize all internal sub progress bars, and return the number of bars."""
155156
if self._sub_progress_bar_names is not None:
@@ -163,6 +164,7 @@ def initialize_sub_progress_bars(self, position: int) -> int:
163164
else:
164165
return 0
165166

167+
# TODO (kyuds): remove
166168
def close_sub_progress_bars(self):
167169
"""Close all internal sub progress bars."""
168170
if self._sub_progress_bar_dict is not None:

python/ray/data/_internal/execution/operators/hash_shuffle.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ def shuffle_name(self) -> str:
374374
def reduce_name(self) -> str:
375375
...
376376

377+
# TODO (kyuds): remove
377378
def initialize_sub_progress_bars(self, position: int) -> int:
378379
"""Display all sub progres bars in the termainl, and return the number of bars."""
379380

@@ -398,6 +399,7 @@ def initialize_sub_progress_bars(self, position: int) -> int:
398399

399400
return progress_bars_created
400401

402+
# TODO (kyuds): remove
401403
def close_sub_progress_bars(self):
402404
"""Close all internal sub progress bars."""
403405
self.shuffle_bar.close()
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
import logging
2+
import math
3+
import time
4+
from enum import Enum
5+
from typing import List, Optional, Tuple
6+
7+
from ray.data._internal.execution.resource_manager import ResourceManager
8+
from ray.data._internal.execution.streaming_executor_state import Topology, OpState
9+
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
10+
from ray.util.debug import log_once
11+
12+
try:
13+
import rich
14+
from rich.console import Console
15+
from rich.live import Live
16+
from rich.progress import (
17+
Progress,
18+
TaskID,
19+
BarColumn,
20+
TextColumn,
21+
TimeRemainingColumn,
22+
FractionColumn,
23+
SpinnerColumn,
24+
)
25+
from rich.table import Table, Column
26+
from rich.text import Text
27+
28+
needs_rich_warning = True
29+
except ImportError:
30+
rich = None
31+
needs_rich_warning = False
32+
33+
logger = logging.getLogger(__name__)
34+
35+
_TREE_BRANCH = " ├─ "
36+
_TREE_VERTICAL = " │ "
37+
_TOTAL_PROGRESS_TOTAL = 1.0
38+
_RESOURCE_REPORT_HEADER = f"{_TREE_VERTICAL} Active/total resources: "
39+
40+
41+
class _ManagerMode(Enum):
42+
NONE = 1 # no-op
43+
GLOBAL_ONLY = 2 # global progress
44+
ALL = 3 # show everything
45+
46+
def show_op(self) -> bool:
47+
return self == self.ALL
48+
49+
def is_enabled(self) -> bool:
50+
return self != self.NONE
51+
52+
@classmethod
53+
def get_mode(cls) -> '_ManagerMode':
54+
from ray.data.context import DataContext
55+
56+
ctx = DataContext.get_current()
57+
if not ctx.enable_progress_bars:
58+
if log_once("ray_data_progress_manager_disabled"):
59+
logger.warning(
60+
"Progress bars disabled. To enable, set "
61+
"`ray.data.DataContext.get_current()."
62+
"enable_progress_bars = True`.")
63+
return cls.NONE
64+
elif rich is None:
65+
global needs_rich_warning
66+
if needs_rich_warning:
67+
print("[dataset]: Run `pip install rich` to enable "
68+
"execution progress reporting.")
69+
needs_rich_warning = False
70+
return cls.NONE
71+
elif not ctx.enable_operator_progress_bars:
72+
if log_once("ray_data_progress_manager_global"):
73+
logger.warning(
74+
"Progress bars for operators disabled. To enable, "
75+
"set `ray.data.DataContext.get_current()."
76+
"enable_operator_progress_bars = True`.")
77+
return cls.GLOBAL_ONLY
78+
else:
79+
return cls.ALL
80+
81+
82+
def _format_row_count(completed: int, total: Optional[int]) -> str:
83+
"""Formats row counts with k units."""
84+
85+
def format_k(val: int) -> str:
86+
if val >= 1000:
87+
fval = val / 1000.0
88+
fval_str = f"{int(fval)}" if fval.is_integer() else f"{fval:.2f}"
89+
return fval_str + "k"
90+
return str(val)
91+
92+
cstr = format_k(completed)
93+
if total is None or math.isinf(total):
94+
tstr = "?k" if cstr.endswith("k") else "?"
95+
else:
96+
tstr = format_k(total)
97+
return f"{cstr}/{tstr}"
98+
99+
100+
class RichExecutionProgressManager:
101+
"""Execution progress display using rich."""
102+
103+
def __init__(self, dataset_id: str, topology: Topology):
104+
self._mode = _ManagerMode.get_mode()
105+
self._dataset_id = dataset_id
106+
107+
if self._mode.is_enabled():
108+
self._start_time: Optional[float] = None
109+
110+
# rich
111+
self._console = Console()
112+
self._total = Progress(
113+
SpinnerColumn(finished_text=""),
114+
TextColumn("{task.description} {task.percentage:>3.0f}%", table_column=Column(no_wrap=True)),
115+
BarColumn(),
116+
TextColumn("{task.fields[count_str]}", table_column=Column(no_wrap=True)),
117+
TextColumn("["), TimeRemainingColumn(), TextColumn("]"),
118+
TextColumn("{task.fields[rate]}", table_column=Column(no_wrap=True)),
119+
console=self._console, transient=False, expand=True,
120+
)
121+
self._total_resources = Text(f"{_RESOURCE_REPORT_HEADER}Initializing...", no_wrap=True)
122+
# TODO (kyuds): op rows
123+
124+
self._layout_table = Table.grid(padding=(0, 1, 0, 0), expand=True)
125+
self._layout_table.add_row(self._total)
126+
self._layout_table.add_row(self._total_resources)
127+
self._live = Live(self._layout_table, console=self._console,
128+
refresh_per_second=2, vertical_overflow="visible")
129+
130+
self._total_task_id = self._total.add_task(
131+
f"Dataset {self._dataset_id} running:", total=_TOTAL_PROGRESS_TOTAL,
132+
rate="? rows/s", count_str="0/?")
133+
else:
134+
self._live = None
135+
136+
# Management
137+
def start(self):
138+
if self._mode.is_enabled():
139+
self._live.start()
140+
141+
def refresh(self):
142+
if self._mode.is_enabled():
143+
self._live.refresh()
144+
145+
def close(self):
146+
if self._mode.is_enabled():
147+
self.refresh()
148+
time.sleep(0.1)
149+
self._live.stop()
150+
151+
# Total Progress
152+
def _can_update_total(self) -> bool:
153+
return (self._mode.is_enabled() and
154+
self._total_task_id is not None and
155+
self._total_task_id in self._total.task_ids)
156+
157+
def update_total_progress(self, total_rows: Optional[int], current_rows: int):
158+
if not self._can_update_total():
159+
return
160+
161+
# Progress Report
162+
if self._start_time is None:
163+
self._start_time = time.time()
164+
165+
elapsed = time.time() - self._start_time
166+
rate_val = current_rows / elapsed if elapsed > 1 else 0
167+
rate_unit = "row/s"
168+
if rate_val >= 1000:
169+
rate_val /= 1000
170+
rate_unit = "k row/s"
171+
rate_str = f"{rate_val:.2f} {rate_unit}"
172+
173+
completed = 0.0
174+
if total_rows is not None and total_rows > 0:
175+
completed = min(1.0, current_rows / total_rows)
176+
177+
count_str = _format_row_count(current_rows, total_rows)
178+
self._total.update(
179+
self._total_task_id,
180+
completed=completed,
181+
total=_TOTAL_PROGRESS_TOTAL,
182+
fields={"rate": rate_str, "count_str": count_str},
183+
)
184+
185+
def update_resource_status(self, resource_manager: ResourceManager):
186+
# running_usage is the amount of resources that have been requested but
187+
# not necessarily available
188+
# TODO(sofian) https://github.com/ray-project/ray/issues/47520
189+
# We need to split the reported resources into running, pending-scheduling,
190+
# pending-node-assignment.
191+
if not self._can_update_total():
192+
return
193+
194+
running_usage = resource_manager.get_global_running_usage()
195+
pending_usage = resource_manager.get_global_pending_usage()
196+
limits = resource_manager.get_global_limits()
197+
198+
resource_usage = _RESOURCE_REPORT_HEADER
199+
resource_usage += f"{running_usage.cpu:.4g}/{limits.cpu:.4g} CPU, "
200+
if running_usage.gpu > 0:
201+
resources_status += f"{running_usage.gpu:.4g}/{limits.gpu:.4g} GPU, "
202+
resources_status += (
203+
f"{running_usage.object_store_memory_str()}/"
204+
f"{limits.object_store_memory_str()} object store"
205+
)
206+
207+
# Only include pending section when there are pending resources.
208+
if pending_usage.cpu or pending_usage.gpu:
209+
pending = []
210+
if pending_usage.cpu:
211+
pending.append(f"{pending_usage.cpu:.4g} CPU")
212+
if pending_usage.gpu:
213+
pending.append(f"{pending_usage.gpu:.4g} GPU")
214+
pending_str = ", ".join(pending)
215+
resources_status += f" (pending: {pending_str})"
216+
217+
self._total_resources.plain = resources_status
218+
219+
def set_finishing_message(self, desc: str):
220+
if not self._can_update_total():
221+
return
222+
223+
self._total.update(
224+
self._total_task_id,
225+
description=desc)
226+
self._total.stop_task(self._total_task_id)
227+
228+
# Op Progress

0 commit comments

Comments
 (0)