Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion python/ray/dashboard/modules/job/tests/test_job_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
import os
import signal
import sys
Expand Down Expand Up @@ -356,6 +357,21 @@ async def test_runtime_env_setup_logged_to_job_driver_logs(
assert f"Running entrypoint for job {job_id}: {entrypoint}" in logs


def create_failure_json(rpc_method_failures):
failures_dict = {}
for method_str, failure_str in rpc_method_failures.items():
num_failures, req_prob, resp_prob, in_flight_prob = map(
int, failure_str.split(":")
)
failures_dict[method_str] = {
"num_failures": num_failures,
"req_failure_prob": req_prob,
"resp_failure_prob": resp_prob,
"in_flight_failure_prob": in_flight_prob,
}
return json.dumps(failures_dict)


@pytest.mark.asyncio
@pytest.mark.parametrize(
"call_ray_start",
Expand Down Expand Up @@ -419,7 +435,12 @@ async def test_pending_job_timeout_during_new_head_creation(
{
"cmd": "ray start --head",
"env": {
"RAY_testing_rpc_failure": "ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet=3:33:33:33,CoreWorkerService.grpc_client.PushTask=3:33:33:33"
"RAY_testing_rpc_failure": create_failure_json(
{
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet": "3:33:33:33",
"CoreWorkerService.grpc_client.PushTask": "3:33:33:33",
}
)
},
},
],
Expand Down
20 changes: 18 additions & 2 deletions python/ray/tests/test_actor_lineage_reconstruction.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import gc
import json
import os
import signal
import sys
Expand Down Expand Up @@ -26,10 +27,25 @@ def test_actor_reconstruction_triggered_by_lineage_reconstruction(
# -> actor is permanently dead when there is no reference.
# This test also injects network failure to make sure relevant rpcs are retried.
failure = RPC_FAILURE_MAP[deterministic_failure]
parts = failure.split(":")
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"ray::rpc::ActorInfoGcsService.grpc_client.RestartActorForLineageReconstruction=1:{failure},"
f"ray::rpc::ActorInfoGcsService.grpc_client.ReportActorOutOfScope=1:{failure}",
json.dumps(
{
"ray::rpc::ActorInfoGcsService.grpc_client.RestartActorForLineageReconstruction": {
"num_failures": 1,
"req_failure_prob": int(parts[0]),
"resp_failure_prob": int(parts[1]),
"in_flight_failure_prob": int(parts[2]),
},
"ray::rpc::ActorInfoGcsService.grpc_client.ReportActorOutOfScope": {
"num_failures": 1,
"req_failure_prob": int(parts[0]),
"resp_failure_prob": int(parts[1]),
"in_flight_failure_prob": int(parts[2]),
},
}
),
)
cluster = ray_start_cluster
cluster.add_node(resources={"head": 1})
Expand Down
36 changes: 30 additions & 6 deletions python/ray/tests/test_core_worker_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import sys

import numpy as np
Expand All @@ -15,6 +16,20 @@
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


def create_failure_json(method, num_failures, failure_str):
parts = failure_str.split(":")
return json.dumps(
{
method: {
"num_failures": num_failures,
"req_failure_prob": int(parts[0]),
"resp_failure_prob": int(parts[1]),
"in_flight_failure_prob": int(parts[2]),
}
}
)
Comment on lines +19 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This helper function create_failure_json is very useful for creating the RPC failure configuration. I noticed it's also defined in test_object_manager_fault_tolerance.py and test_raylet_fault_tolerance.py. To avoid code duplication and improve maintainability, would you consider moving this function to a shared test utility module, such as ray/_private/test_utils.py? This would allow all tests to import and use a single implementation.



@pytest.mark.parametrize(
"allow_out_of_order_execution",
[True, False],
Expand All @@ -30,7 +45,7 @@ def test_push_actor_task_failure(
failure = RPC_FAILURE_MAP[deterministic_failure]
m.setenv(
"RAY_testing_rpc_failure",
f"CoreWorkerService.grpc_client.PushTask=2:{failure}",
create_failure_json("CoreWorkerService.grpc_client.PushTask", 2, failure),
)
m.setenv("RAY_actor_scheduling_queue_max_reorder_wait_seconds", "0")
cluster = ray_start_cluster
Expand Down Expand Up @@ -60,7 +75,9 @@ def test_update_object_location_batch_failure(
failure = RPC_FAILURE_MAP[deterministic_failure]
m.setenv(
"RAY_testing_rpc_failure",
f"CoreWorkerService.grpc_client.UpdateObjectLocationBatch=1:{failure}",
create_failure_json(
"CoreWorkerService.grpc_client.UpdateObjectLocationBatch", 1, failure
),
)
cluster = ray_start_cluster
head_node_id = cluster.add_node(
Expand Down Expand Up @@ -102,7 +119,9 @@ def test_get_object_status_rpc_retry_and_idempotency(
failure = RPC_FAILURE_MAP[deterministic_failure]
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"CoreWorkerService.grpc_client.GetObjectStatus=1:{failure}",
create_failure_json(
"CoreWorkerService.grpc_client.GetObjectStatus", 1, failure
),
)

ray.init()
Expand Down Expand Up @@ -134,7 +153,9 @@ def test_wait_for_actor_ref_deleted_rpc_retry_and_idempotency(
failure = RPC_FAILURE_MAP[deterministic_failure]
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"CoreWorkerService.grpc_client.WaitForActorRefDeleted=1:{failure}",
create_failure_json(
"CoreWorkerService.grpc_client.WaitForActorRefDeleted", 1, failure
),
)

ray.init()
Expand Down Expand Up @@ -172,7 +193,9 @@ def inject_cancel_remote_task_rpc_failure(monkeypatch, request):
failure = RPC_FAILURE_MAP[deterministic_failure]
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"CoreWorkerService.grpc_client.CancelRemoteTask=1:{failure}",
create_failure_json(
"CoreWorkerService.grpc_client.CancelRemoteTask", 1, failure
),
)


Expand Down Expand Up @@ -213,7 +236,8 @@ def remote_wait(sg):
def test_double_borrowing_with_rpc_failure(monkeypatch, shutdown_only):
"""Regression test for https://github.com/ray-project/ray/issues/57997"""
monkeypatch.setenv(
"RAY_testing_rpc_failure", "CoreWorkerService.grpc_client.PushTask=3:0:100:0"
"RAY_testing_rpc_failure",
create_failure_json("CoreWorkerService.grpc_client.PushTask", 3, "0:100:0"),
)

ray.init()
Expand Down
12 changes: 11 additions & 1 deletion python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
import os
import signal
import subprocess
Expand Down Expand Up @@ -1261,7 +1262,16 @@ def test_mark_job_finished_rpc_retry_and_idempotency(shutdown_only, monkeypatch)
# We inject request failures to force retries and test idempotency
monkeypatch.setenv(
"RAY_testing_rpc_failure",
"ray::rpc::JobInfoGcsService.grpc_client.MarkJobFinished=3:50:0:0",
json.dumps(
{
"ray::rpc::JobInfoGcsService.grpc_client.MarkJobFinished": {
"num_failures": 3,
"req_failure_prob": 50,
"resp_failure_prob": 0,
"in_flight_failure_prob": 0,
}
}
),
)

ray.init(num_cpus=1)
Expand Down
19 changes: 17 additions & 2 deletions python/ray/tests/test_gcs_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import contextlib
import json
import os
import signal
import sys
Expand Down Expand Up @@ -109,8 +110,22 @@ def test_kv_timeout(ray_start_regular):
def test_kv_transient_network_error(shutdown_only, monkeypatch):
monkeypatch.setenv(
"RAY_testing_rpc_failure",
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet=5:25:25:25,"
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVPut=5:25:25:25",
json.dumps(
{
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet": {
"num_failures": 5,
"req_failure_prob": 25,
"resp_failure_prob": 25,
"in_flight_failure_prob": 25,
},
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVPut": {
"num_failures": 5,
"req_failure_prob": 25,
"resp_failure_prob": 25,
"in_flight_failure_prob": 25,
},
}
),
)
ray.init()
gcs_address = ray._private.worker.global_worker.gcs_client.address
Expand Down
17 changes: 16 additions & 1 deletion python/ray/tests/test_object_manager_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import sys

import numpy as np
Expand All @@ -13,14 +14,28 @@
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


def create_failure_json(method, num_failures, failure_str):
parts = failure_str.split(":")
return json.dumps(
{
method: {
"num_failures": num_failures,
"req_failure_prob": int(parts[0]),
"resp_failure_prob": int(parts[1]),
"in_flight_failure_prob": int(parts[2]),
}
}
)


@pytest.mark.parametrize("deterministic_failure", RPC_FAILURE_TYPES)
def test_free_objects_idempotent(
monkeypatch, shutdown_only, deterministic_failure, ray_start_cluster
):
failure = RPC_FAILURE_MAP[deterministic_failure]
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"ObjectManagerService.grpc_client.FreeObjects=1:{failure}",
create_failure_json("ObjectManagerService.grpc_client.FreeObjects", 1, failure),
)

@ray.remote
Expand Down
64 changes: 58 additions & 6 deletions python/ray/tests/test_raylet_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import sys

Expand All @@ -19,14 +20,30 @@
import psutil


def create_failure_json(method, num_failures, failure_str):
parts = failure_str.split(":")
return json.dumps(
{
method: {
"num_failures": num_failures,
"req_failure_prob": int(parts[0]),
"resp_failure_prob": int(parts[1]),
"in_flight_failure_prob": int(parts[2]),
}
}
)


@pytest.mark.parametrize("deterministic_failure", RPC_FAILURE_TYPES)
def test_request_worker_lease_idempotent(
monkeypatch, shutdown_only, deterministic_failure, ray_start_cluster
):
failure = RPC_FAILURE_MAP[deterministic_failure]
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"NodeManagerService.grpc_client.RequestWorkerLease=1:{failure}",
create_failure_json(
"NodeManagerService.grpc_client.RequestWorkerLease", 1, failure
),
)

@ray.remote
Expand Down Expand Up @@ -61,7 +78,16 @@ def test_drain_node_idempotent(monkeypatch, shutdown_only, ray_start_cluster):
# NOTE: not testing response failure since the node is already marked as draining and shuts down gracefully.
monkeypatch.setenv(
"RAY_testing_rpc_failure",
"NodeManagerService.grpc_client.DrainRaylet=1:100:0:0",
json.dumps(
{
"NodeManagerService.grpc_client.DrainRaylet": {
"num_failures": 1,
"req_failure_prob": 100,
"resp_failure_prob": 0,
"in_flight_failure_prob": 0,
}
}
),
)

cluster = ray_start_cluster
Expand Down Expand Up @@ -99,10 +125,25 @@ def node_is_dead():
def inject_release_unused_bundles_rpc_failure(monkeypatch, request):
deterministic_failure = request.param
failure = RPC_FAILURE_MAP[deterministic_failure]
parts = failure.split(":")
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"NodeManagerService.grpc_client.ReleaseUnusedBundles=1:{failure}"
+ ",NodeManagerService.grpc_client.CancelResourceReserve=-1:100:0:0",
json.dumps(
{
"NodeManagerService.grpc_client.ReleaseUnusedBundles": {
"num_failures": 1,
"req_failure_prob": int(parts[0]),
"resp_failure_prob": int(parts[1]),
"in_flight_failure_prob": int(parts[2]),
},
"NodeManagerService.grpc_client.CancelResourceReserve": {
"num_failures": -1,
"req_failure_prob": 100,
"resp_failure_prob": 0,
"in_flight_failure_prob": 0,
},
}
),
)


Expand Down Expand Up @@ -155,7 +196,9 @@ def inject_notify_gcs_restart_rpc_failure(monkeypatch, request):
failure = RPC_FAILURE_MAP[deterministic_failure]
monkeypatch.setenv(
"RAY_testing_rpc_failure",
f"NodeManagerService.grpc_client.NotifyGCSRestart=1:{failure}",
create_failure_json(
"NodeManagerService.grpc_client.NotifyGCSRestart", 1, failure
),
)


Expand Down Expand Up @@ -215,7 +258,16 @@ def test_kill_local_actor_rpc_retry_and_idempotency(monkeypatch, shutdown_only):

monkeypatch.setenv(
"RAY_testing_rpc_failure",
"NodeManagerService.grpc_client.KillLocalActor=1:100:0:0",
json.dumps(
{
"NodeManagerService.grpc_client.KillLocalActor": {
"num_failures": 1,
"req_failure_prob": 100,
"resp_failure_prob": 0,
"in_flight_failure_prob": 0,
}
}
),
)

ray.init()
Expand Down
12 changes: 11 additions & 1 deletion python/ray/tests/test_streaming_generator_4.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import gc
import json
import os
import random
import signal
Expand Down Expand Up @@ -190,7 +191,16 @@ def test_many_tasks_lineage_reconstruction_mini_stress_test(
)
m.setenv(
"RAY_testing_rpc_failure",
"CoreWorkerService.grpc_client.ReportGeneratorItemReturns=5:25:25:25",
json.dumps(
{
"CoreWorkerService.grpc_client.ReportGeneratorItemReturns": {
"num_failures": 5,
"req_failure_prob": 25,
"resp_failure_prob": 25,
"in_flight_failure_prob": 25,
}
}
),
)
cluster = ray_start_cluster
cluster.add_node(
Expand Down
Loading