Skip to content

Commit a0675a5

Browse files
authored
[core] deprecate raw_metrics api in tests (#58614)
Deprecate `raw_metrics` API and replace them all with the `raw_metric_timeseries` API. `raw_metrics` returns the current snapshot of a set of metrics, while `raw_metric_timeseries` returns the full time series. The later is more reliable when checking the latest instance of several independent metrics. Test: - CI Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent b768e90 commit a0675a5

File tree

6 files changed

+76
-50
lines changed

6 files changed

+76
-50
lines changed

python/ray/_private/test_utils.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,20 +1065,6 @@ def fetch_prometheus_metric_timeseries(
10651065
return samples_by_name
10661066

10671067

1068-
def raw_metrics(info: RayContext) -> Dict[str, List[Any]]:
1069-
"""Return prometheus metrics from a RayContext
1070-
1071-
Args:
1072-
info: Ray context returned from ray.init()
1073-
1074-
Returns:
1075-
Dict from metric name to a list of samples for the metrics
1076-
"""
1077-
metrics_page = "localhost:{}".format(info.address_info["metrics_export_port"])
1078-
print("Fetch metrics from", metrics_page)
1079-
return fetch_prometheus_metrics([metrics_page])
1080-
1081-
10821068
def raw_metric_timeseries(
10831069
info: RayContext, result: PrometheusTimeseries
10841070
) -> Dict[str, List[Any]]:

python/ray/tests/test_memory_pressure.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
)
1414
from ray._private.grpc_utils import init_grpc_channel
1515
from ray._private.state_api_test_utils import verify_failed_task
16-
from ray._private.test_utils import raw_metrics
16+
from ray._private.test_utils import PrometheusTimeseries, raw_metric_timeseries
1717
from ray._private.utils import get_used_memory
1818
from ray.util.state.state_manager import StateDataSourceClient
1919

@@ -118,8 +118,10 @@ def get_additional_bytes_to_reach_memory_usage_pct(pct: float) -> int:
118118
return bytes_needed
119119

120120

121-
def has_metric_tagged_with_value(addr, tag, value) -> bool:
122-
metrics = raw_metrics(addr)
121+
def has_metric_tagged_with_value(
122+
addr, tag, value, timeseries: PrometheusTimeseries
123+
) -> bool:
124+
metrics = raw_metric_timeseries(addr, timeseries)
123125
for name, samples in metrics.items():
124126
for sample in samples:
125127
if tag in set(sample.labels.values()) and sample.value == value:
@@ -145,13 +147,15 @@ def test_restartable_actor_throws_oom_error(ray_with_memory_monitor, restartable
145147
with pytest.raises(ray.exceptions.OutOfMemoryError):
146148
ray.get(leaker.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3))
147149

150+
timeseries = PrometheusTimeseries()
148151
wait_for_condition(
149152
has_metric_tagged_with_value,
150153
timeout=10,
151154
retry_interval_ms=100,
152155
addr=addr,
153156
tag="MemoryManager.ActorEviction.Total",
154157
value=2.0 if restartable else 1.0,
158+
timeseries=timeseries,
155159
)
156160

157161
wait_for_condition(
@@ -161,6 +165,7 @@ def test_restartable_actor_throws_oom_error(ray_with_memory_monitor, restartable
161165
addr=addr,
162166
tag="Leaker.__init__",
163167
value=2.0 if restartable else 1.0,
168+
timeseries=timeseries,
164169
)
165170

166171

@@ -180,13 +185,15 @@ def test_restartable_actor_oom_retry_off_throws_oom_error(
180185
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
181186
ray.get(leaker.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3))
182187

188+
timeseries = PrometheusTimeseries()
183189
wait_for_condition(
184190
has_metric_tagged_with_value,
185191
timeout=10,
186192
retry_interval_ms=100,
187193
addr=addr,
188194
tag="MemoryManager.ActorEviction.Total",
189195
value=2.0,
196+
timeseries=timeseries,
190197
)
191198
wait_for_condition(
192199
has_metric_tagged_with_value,
@@ -195,6 +202,7 @@ def test_restartable_actor_oom_retry_off_throws_oom_error(
195202
addr=addr,
196203
tag="Leaker.__init__",
197204
value=2.0,
205+
timeseries=timeseries,
198206
)
199207

200208

@@ -210,13 +218,15 @@ def test_non_retryable_task_killed_by_memory_monitor_with_oom_error(
210218
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
211219
ray.get(allocate_memory.options(max_retries=0).remote(bytes_to_alloc))
212220

221+
timeseries = PrometheusTimeseries()
213222
wait_for_condition(
214223
has_metric_tagged_with_value,
215224
timeout=10,
216225
retry_interval_ms=100,
217226
addr=addr,
218227
tag="MemoryManager.TaskEviction.Total",
219228
value=1.0,
229+
timeseries=timeseries,
220230
)
221231
wait_for_condition(
222232
has_metric_tagged_with_value,
@@ -225,6 +235,7 @@ def test_non_retryable_task_killed_by_memory_monitor_with_oom_error(
225235
addr=addr,
226236
tag="allocate_memory",
227237
value=1.0,
238+
timeseries=timeseries,
228239
)
229240

230241

@@ -372,13 +383,15 @@ def test_task_oom_no_oom_retry_fails_immediately(
372383
)
373384
)
374385

386+
timeseries = PrometheusTimeseries()
375387
wait_for_condition(
376388
has_metric_tagged_with_value,
377389
timeout=10,
378390
retry_interval_ms=100,
379391
addr=addr,
380392
tag="MemoryManager.TaskEviction.Total",
381393
value=1.0,
394+
timeseries=timeseries,
382395
)
383396
wait_for_condition(
384397
has_metric_tagged_with_value,
@@ -387,6 +400,7 @@ def test_task_oom_no_oom_retry_fails_immediately(
387400
addr=addr,
388401
tag="allocate_memory",
389402
value=1.0,
403+
timeseries=timeseries,
390404
)
391405

392406

@@ -411,13 +425,15 @@ def test_task_oom_only_uses_oom_retry(
411425
)
412426
)
413427

428+
timeseries = PrometheusTimeseries()
414429
wait_for_condition(
415430
has_metric_tagged_with_value,
416431
timeout=10,
417432
retry_interval_ms=100,
418433
addr=addr,
419434
tag="MemoryManager.TaskEviction.Total",
420435
value=task_oom_retries + 1,
436+
timeseries=timeseries,
421437
)
422438
wait_for_condition(
423439
has_metric_tagged_with_value,
@@ -426,6 +442,7 @@ def test_task_oom_only_uses_oom_retry(
426442
addr=addr,
427443
tag="allocate_memory",
428444
value=task_oom_retries + 1,
445+
timeseries=timeseries,
429446
)
430447

431448

@@ -502,6 +519,7 @@ def infinite_retry_task():
502519
time.sleep(5)
503520

504521
with ray.init() as addr:
522+
timeseries = PrometheusTimeseries()
505523
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
506524
ray.get(infinite_retry_task.remote())
507525

@@ -512,6 +530,7 @@ def infinite_retry_task():
512530
addr=addr,
513531
tag="MemoryManager.TaskEviction.Total",
514532
value=1.0,
533+
timeseries=timeseries,
515534
)
516535

517536

python/ray/tests/test_object_store_metrics.py

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
import ray
1010
from ray._common.test_utils import wait_for_condition
1111
from ray._private.test_utils import (
12-
raw_metrics,
12+
PrometheusTimeseries,
13+
raw_metric_timeseries,
1314
)
1415
from ray._private.worker import RayContext
1516
from ray.dashboard.consts import RAY_DASHBOARD_STATS_UPDATING_INTERVAL
@@ -26,8 +27,10 @@
2627
}
2728

2829

29-
def _objects_by_tag(info: RayContext, tag: str) -> Dict:
30-
res = raw_metrics(info)
30+
def _objects_by_tag(
31+
info: RayContext, tag: str, timeseries: PrometheusTimeseries
32+
) -> Dict:
33+
res = raw_metric_timeseries(info, timeseries)
3134
objects_info = defaultdict(int)
3235
if "ray_object_store_memory" in res:
3336
for sample in res["ray_object_store_memory"]:
@@ -41,12 +44,12 @@ def _objects_by_tag(info: RayContext, tag: str) -> Dict:
4144
return objects_info
4245

4346

44-
def objects_by_seal_state(info: RayContext) -> Dict:
45-
return _objects_by_tag(info, "ObjectState")
47+
def objects_by_seal_state(info: RayContext, timeseries: PrometheusTimeseries) -> Dict:
48+
return _objects_by_tag(info, "ObjectState", timeseries)
4649

4750

48-
def objects_by_loc(info: RayContext) -> Dict:
49-
return _objects_by_tag(info, "Location")
51+
def objects_by_loc(info: RayContext, timeseries: PrometheusTimeseries) -> Dict:
52+
return _objects_by_tag(info, "Location", timeseries)
5053

5154

5255
def approx_eq_dict_in(actual: Dict, expected: Dict, e: int) -> bool:
@@ -79,6 +82,7 @@ def test_shared_memory_and_inline_worker_heap(shutdown_only):
7982
},
8083
},
8184
)
85+
timeseries = PrometheusTimeseries()
8286

8387
# Allocate 80MiB data
8488
objs_in_use = ray.get(
@@ -94,7 +98,7 @@ def test_shared_memory_and_inline_worker_heap(shutdown_only):
9498

9599
wait_for_condition(
96100
# 1KiB for metadata difference
97-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 2 * KiB),
101+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 2 * KiB),
98102
timeout=20,
99103
retry_interval_ms=500,
100104
)
@@ -117,7 +121,7 @@ def func():
117121

118122
wait_for_condition(
119123
# 4 KiB for metadata difference
120-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 4 * KiB),
124+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 4 * KiB),
121125
timeout=20,
122126
retry_interval_ms=500,
123127
)
@@ -136,7 +140,7 @@ def func():
136140

137141
wait_for_condition(
138142
# 1KiB for metadata difference
139-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 2 * KiB),
143+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 2 * KiB),
140144
timeout=20,
141145
retry_interval_ms=500,
142146
)
@@ -158,7 +162,7 @@ def test_spilling(object_spilling_config, shutdown_only):
158162
**{"object_spilling_config": object_spilling_config},
159163
},
160164
)
161-
165+
timeseries = PrometheusTimeseries()
162166
# Create and use 100MiB data, which should fit in memory
163167
objs1 = [ray.put(np.zeros(50 * MiB, dtype=np.uint8)) for _ in range(2)]
164168

@@ -171,7 +175,7 @@ def test_spilling(object_spilling_config, shutdown_only):
171175

172176
wait_for_condition(
173177
# 1KiB for metadata difference
174-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
178+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 1 * KiB),
175179
timeout=20,
176180
retry_interval_ms=500,
177181
)
@@ -187,7 +191,7 @@ def test_spilling(object_spilling_config, shutdown_only):
187191
}
188192
wait_for_condition(
189193
# 1KiB for metadata difference
190-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
194+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 1 * KiB),
191195
timeout=20,
192196
retry_interval_ms=500,
193197
)
@@ -202,7 +206,7 @@ def test_spilling(object_spilling_config, shutdown_only):
202206
}
203207
wait_for_condition(
204208
# 1KiB for metadata difference
205-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
209+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 1 * KiB),
206210
timeout=20,
207211
retry_interval_ms=500,
208212
)
@@ -217,7 +221,7 @@ def test_spilling(object_spilling_config, shutdown_only):
217221
}
218222
wait_for_condition(
219223
# 1KiB for metadata difference
220-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
224+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 1 * KiB),
221225
timeout=20,
222226
retry_interval_ms=500,
223227
)
@@ -239,6 +243,7 @@ def test_fallback_memory(shutdown_only):
239243
object_store_memory=expected_in_memory * obj_size_mb * MiB + delta_mb * MiB,
240244
_system_config=_SYSTEM_CONFIG,
241245
)
246+
timeseries = PrometheusTimeseries()
242247
obj_refs = [
243248
ray.put(np.zeros(obj_size_mb * MiB, dtype=np.uint8))
244249
for _ in range(expected_in_memory)
@@ -257,7 +262,7 @@ def test_fallback_memory(shutdown_only):
257262

258263
wait_for_condition(
259264
# 2KiB for metadata difference
260-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 3 * KiB),
265+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 3 * KiB),
261266
timeout=20,
262267
retry_interval_ms=500,
263268
)
@@ -285,7 +290,7 @@ def test_fallback_memory(shutdown_only):
285290

286291
wait_for_condition(
287292
# 3KiB for metadata difference
288-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 3 * KiB),
293+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 3 * KiB),
289294
timeout=20,
290295
retry_interval_ms=500,
291296
)
@@ -305,7 +310,7 @@ def test_fallback_memory(shutdown_only):
305310

306311
wait_for_condition(
307312
# 3KiB for metadata difference
308-
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 3 * KiB),
313+
lambda: approx_eq_dict_in(objects_by_loc(info, timeseries), expected, 3 * KiB),
309314
timeout=20,
310315
retry_interval_ms=500,
311316
)
@@ -322,7 +327,7 @@ def test_seal_memory(shutdown_only):
322327
object_store_memory=100 * MiB,
323328
_system_config=_SYSTEM_CONFIG,
324329
)
325-
330+
timeseries = PrometheusTimeseries()
326331
# Allocate 80MiB data
327332
objs_in_use = ray.get(
328333
[ray.put(np.zeros(20 * MiB, dtype=np.uint8)) for _ in range(4)]
@@ -335,7 +340,9 @@ def test_seal_memory(shutdown_only):
335340

336341
wait_for_condition(
337342
# 1KiB for metadata difference
338-
lambda: approx_eq_dict_in(objects_by_seal_state(info), expected, 2 * KiB),
343+
lambda: approx_eq_dict_in(
344+
objects_by_seal_state(info, timeseries), expected, 2 * KiB
345+
),
339346
timeout=20,
340347
retry_interval_ms=500,
341348
)
@@ -349,7 +356,9 @@ def test_seal_memory(shutdown_only):
349356

350357
wait_for_condition(
351358
# 1KiB for metadata difference
352-
lambda: approx_eq_dict_in(objects_by_seal_state(info), expected, 2 * KiB),
359+
lambda: approx_eq_dict_in(
360+
objects_by_seal_state(info, timeseries), expected, 2 * KiB
361+
),
353362
timeout=20,
354363
retry_interval_ms=500,
355364
)
@@ -362,9 +371,10 @@ def test_object_store_memory_matches_dashboard_obj_memory(shutdown_only):
362371
ctx = ray.init(
363372
object_store_memory=500 * MiB,
364373
)
374+
timeseries = PrometheusTimeseries()
365375

366376
def verify():
367-
resources = raw_metrics(ctx)["ray_resources"]
377+
resources = raw_metric_timeseries(ctx, timeseries)["ray_resources"]
368378
object_store_memory_bytes_from_metrics = 0
369379
for sample in resources:
370380
# print(sample)

0 commit comments

Comments
 (0)