44import pytest
55
66import ray
7- from ray ._common .test_utils import SignalActor , wait_for_condition
7+ from ray ._common .test_utils import SignalActor
8+ from ray ._private .test_utils import (
9+ RPC_FAILURE_MAP ,
10+ RPC_FAILURE_TYPES ,
11+ wait_for_condition ,
12+ )
813from ray .core .generated import common_pb2 , gcs_pb2
914from ray .exceptions import GetTimeoutError , TaskCancelledError
1015from ray .util .scheduling_strategies import NodeAffinitySchedulingStrategy
1419 "allow_out_of_order_execution" ,
1520 [True , False ],
1621)
17- @pytest .mark .parametrize ("deterministic_failure" , [ "request" , "response" ] )
22+ @pytest .mark .parametrize ("deterministic_failure" , RPC_FAILURE_TYPES )
1823def test_push_actor_task_failure (
1924 monkeypatch ,
2025 ray_start_cluster ,
2126 allow_out_of_order_execution : bool ,
2227 deterministic_failure : str ,
2328):
2429 with monkeypatch .context () as m :
30+ failure = RPC_FAILURE_MAP [deterministic_failure ]
2531 m .setenv (
2632 "RAY_testing_rpc_failure" ,
27- "CoreWorkerService.grpc_client.PushTask=2:"
28- + ("100:0" if deterministic_failure == "request" else "0:100" ),
33+ f"CoreWorkerService.grpc_client.PushTask=2:{ failure } " ,
2934 )
3035 m .setenv ("RAY_actor_scheduling_queue_max_reorder_wait_seconds" , "0" )
3136 cluster = ray_start_cluster
@@ -47,15 +52,15 @@ def echo(self, value):
4752 assert ray .get (refs ) == list (range (10 ))
4853
4954
50- @pytest .mark .parametrize ("deterministic_failure" , [ "request" , "response" ] )
55+ @pytest .mark .parametrize ("deterministic_failure" , RPC_FAILURE_TYPES )
5156def test_update_object_location_batch_failure (
5257 monkeypatch , ray_start_cluster , deterministic_failure
5358):
5459 with monkeypatch .context () as m :
60+ failure = RPC_FAILURE_MAP [deterministic_failure ]
5561 m .setenv (
5662 "RAY_testing_rpc_failure" ,
57- "CoreWorkerService.grpc_client.UpdateObjectLocationBatch=1:"
58- + ("100:0" if deterministic_failure == "request" else "0:100" ),
63+ f"CoreWorkerService.grpc_client.UpdateObjectLocationBatch=1:{ failure } " ,
5964 )
6065 cluster = ray_start_cluster
6166 head_node_id = cluster .add_node (
@@ -85,7 +90,7 @@ def consume_large_object(obj):
8590 assert ray .get (consume_ref , timeout = 10 ) > 0
8691
8792
88- @pytest .mark .parametrize ("deterministic_failure" , [ "request" , "response" ] )
93+ @pytest .mark .parametrize ("deterministic_failure" , RPC_FAILURE_TYPES )
8994def test_get_object_status_rpc_retry_and_idempotency (
9095 monkeypatch , shutdown_only , deterministic_failure
9196):
@@ -94,11 +99,10 @@ def test_get_object_status_rpc_retry_and_idempotency(
9499 Cross_worker_access_task triggers GetObjectStatus because it does
95100 not own objects and needs to request it from the driver.
96101 """
97-
102+ failure = RPC_FAILURE_MAP [ deterministic_failure ]
98103 monkeypatch .setenv (
99104 "RAY_testing_rpc_failure" ,
100- "CoreWorkerService.grpc_client.GetObjectStatus=1:"
101- + ("100:0" if deterministic_failure == "request" else "0:100" ),
105+ f"CoreWorkerService.grpc_client.GetObjectStatus=1:{ failure } " ,
102106 )
103107
104108 ray .init ()
@@ -118,7 +122,7 @@ def cross_worker_access_task(objects):
118122 assert final_result == [0 , 2 , 4 , 6 , 8 ]
119123
120124
121- @pytest .mark .parametrize ("deterministic_failure" , [ "request" , "response" ] )
125+ @pytest .mark .parametrize ("deterministic_failure" , RPC_FAILURE_TYPES )
122126def test_wait_for_actor_ref_deleted_rpc_retry_and_idempotency (
123127 monkeypatch , shutdown_only , deterministic_failure
124128):
@@ -127,11 +131,10 @@ def test_wait_for_actor_ref_deleted_rpc_retry_and_idempotency(
127131 The GCS actor manager will trigger this RPC during actor initialization
128132 to monitor when the actor handles have gone out of scope and the actor should be destroyed.
129133 """
130-
134+ failure = RPC_FAILURE_MAP [ deterministic_failure ]
131135 monkeypatch .setenv (
132136 "RAY_testing_rpc_failure" ,
133- "CoreWorkerService.grpc_client.WaitForActorRefDeleted=1:"
134- + ("100:0" if deterministic_failure == "request" else "0:100" ),
137+ f"CoreWorkerService.grpc_client.WaitForActorRefDeleted=1:{ failure } " ,
135138 )
136139
137140 ray .init ()
@@ -166,15 +169,17 @@ def verify_actor_ref_deleted():
166169@pytest .fixture
167170def inject_cancel_remote_task_rpc_failure (monkeypatch , request ):
168171 deterministic_failure = request .param
172+ failure = RPC_FAILURE_MAP [deterministic_failure ]
169173 monkeypatch .setenv (
170174 "RAY_testing_rpc_failure" ,
171- "CoreWorkerService.grpc_client.CancelRemoteTask=1:"
172- + ("100:0" if deterministic_failure == "request" else "0:100" ),
175+ f"CoreWorkerService.grpc_client.CancelRemoteTask=1:{ failure } " ,
173176 )
174177
175178
176179@pytest .mark .parametrize (
177- "inject_cancel_remote_task_rpc_failure" , ["request" , "response" ], indirect = True
180+ "inject_cancel_remote_task_rpc_failure" ,
181+ RPC_FAILURE_TYPES ,
182+ indirect = True ,
178183)
179184def test_cancel_remote_task_rpc_retry_and_idempotency (
180185 inject_cancel_remote_task_rpc_failure , ray_start_cluster
@@ -208,7 +213,7 @@ def remote_wait(sg):
208213def test_double_borrowing_with_rpc_failure (monkeypatch , shutdown_only ):
209214 """Regression test for https://github.com/ray-project/ray/issues/57997"""
210215 monkeypatch .setenv (
211- "RAY_testing_rpc_failure" , "CoreWorkerService.grpc_client.PushTask=3:0:100"
216+ "RAY_testing_rpc_failure" , "CoreWorkerService.grpc_client.PushTask=3:0:100:0 "
212217 )
213218
214219 ray .init ()
0 commit comments