From fc8789cef0842319d885a38c072b126dd9f0afbf Mon Sep 17 00:00:00 2001 From: dancingactor Date: Sat, 6 Dec 2025 11:44:00 +0800 Subject: [PATCH 1/3] [core] Make RPC Chaos Configurations More Readable as JSON Signed-off-by: dancingactor --- .../modules/job/tests/test_job_manager.py | 23 ++++++- .../test_actor_lineage_reconstruction.py | 20 +++++- .../tests/test_core_worker_fault_tolerance.py | 36 +++++++++-- python/ray/tests/test_gcs_fault_tolerance.py | 12 +++- python/ray/tests/test_gcs_utils.py | 19 +++++- .../test_object_manager_fault_tolerance.py | 17 ++++- .../ray/tests/test_raylet_fault_tolerance.py | 64 +++++++++++++++++-- .../ray/tests/test_streaming_generator_4.py | 12 +++- src/ray/rpc/BUILD.bazel | 1 + src/ray/rpc/rpc_chaos.cc | 51 ++++++++------- src/ray/rpc/tests/rpc_chaos_test.cc | 20 ++++-- 11 files changed, 224 insertions(+), 51 deletions(-) diff --git a/python/ray/dashboard/modules/job/tests/test_job_manager.py b/python/ray/dashboard/modules/job/tests/test_job_manager.py index 5d0a9c9b6698..f46c0723672d 100644 --- a/python/ray/dashboard/modules/job/tests/test_job_manager.py +++ b/python/ray/dashboard/modules/job/tests/test_job_manager.py @@ -1,4 +1,5 @@ import asyncio +import json import os import signal import sys @@ -356,6 +357,21 @@ async def test_runtime_env_setup_logged_to_job_driver_logs( assert f"Running entrypoint for job {job_id}: {entrypoint}" in logs +def create_failure_json(rpc_method_failures): + failures_dict = {} + for method_str, failure_str in rpc_method_failures.items(): + num_failures, req_prob, resp_prob, in_flight_prob = map( + int, failure_str.split(":") + ) + failures_dict[method_str] = { + "num_failures": num_failures, + "req_failure_prob": req_prob, + "resp_failure_prob": resp_prob, + "in_flight_failure_prob": in_flight_prob, + } + return json.dumps(failures_dict) + + @pytest.mark.asyncio @pytest.mark.parametrize( "call_ray_start", @@ -419,7 +435,12 @@ async def test_pending_job_timeout_during_new_head_creation( { "cmd": "ray start --head", "env": { - "RAY_testing_rpc_failure": "ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet=3:33:33:33,CoreWorkerService.grpc_client.PushTask=3:33:33:33" + "RAY_testing_rpc_failure": create_failure_json( + { + "ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet": "3:33:33:33", + "CoreWorkerService.grpc_client.PushTask": "3:33:33:33", + } + ) }, }, ], diff --git a/python/ray/tests/test_actor_lineage_reconstruction.py b/python/ray/tests/test_actor_lineage_reconstruction.py index 5f892f3f8713..3b3d9340a0d0 100644 --- a/python/ray/tests/test_actor_lineage_reconstruction.py +++ b/python/ray/tests/test_actor_lineage_reconstruction.py @@ -1,4 +1,5 @@ import gc +import json import os import signal import sys @@ -26,10 +27,25 @@ def test_actor_reconstruction_triggered_by_lineage_reconstruction( # -> actor is permanently dead when there is no reference. # This test also injects network failure to make sure relevant rpcs are retried. failure = RPC_FAILURE_MAP[deterministic_failure] + parts = failure.split(":") monkeypatch.setenv( "RAY_testing_rpc_failure", - f"ray::rpc::ActorInfoGcsService.grpc_client.RestartActorForLineageReconstruction=1:{failure}," - f"ray::rpc::ActorInfoGcsService.grpc_client.ReportActorOutOfScope=1:{failure}", + json.dumps( + { + "ray::rpc::ActorInfoGcsService.grpc_client.RestartActorForLineageReconstruction": { + "num_failures": 1, + "req_failure_prob": int(parts[0]), + "resp_failure_prob": int(parts[1]), + "in_flight_failure_prob": int(parts[2]), + }, + "ray::rpc::ActorInfoGcsService.grpc_client.ReportActorOutOfScope": { + "num_failures": 1, + "req_failure_prob": int(parts[0]), + "resp_failure_prob": int(parts[1]), + "in_flight_failure_prob": int(parts[2]), + }, + } + ), ) cluster = ray_start_cluster cluster.add_node(resources={"head": 1}) diff --git a/python/ray/tests/test_core_worker_fault_tolerance.py b/python/ray/tests/test_core_worker_fault_tolerance.py index 3ff65345870d..7c10e567a596 100644 --- a/python/ray/tests/test_core_worker_fault_tolerance.py +++ b/python/ray/tests/test_core_worker_fault_tolerance.py @@ -1,3 +1,4 @@ +import json import sys import numpy as np @@ -15,6 +16,20 @@ from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy +def create_failure_json(method, num_failures, failure_str): + parts = failure_str.split(":") + return json.dumps( + { + method: { + "num_failures": num_failures, + "req_failure_prob": int(parts[0]), + "resp_failure_prob": int(parts[1]), + "in_flight_failure_prob": int(parts[2]), + } + } + ) + + @pytest.mark.parametrize( "allow_out_of_order_execution", [True, False], @@ -30,7 +45,7 @@ def test_push_actor_task_failure( failure = RPC_FAILURE_MAP[deterministic_failure] m.setenv( "RAY_testing_rpc_failure", - f"CoreWorkerService.grpc_client.PushTask=2:{failure}", + create_failure_json("CoreWorkerService.grpc_client.PushTask", 2, failure), ) m.setenv("RAY_actor_scheduling_queue_max_reorder_wait_seconds", "0") cluster = ray_start_cluster @@ -60,7 +75,9 @@ def test_update_object_location_batch_failure( failure = RPC_FAILURE_MAP[deterministic_failure] m.setenv( "RAY_testing_rpc_failure", - f"CoreWorkerService.grpc_client.UpdateObjectLocationBatch=1:{failure}", + create_failure_json( + "CoreWorkerService.grpc_client.UpdateObjectLocationBatch", 1, failure + ), ) cluster = ray_start_cluster head_node_id = cluster.add_node( @@ -102,7 +119,9 @@ def test_get_object_status_rpc_retry_and_idempotency( failure = RPC_FAILURE_MAP[deterministic_failure] monkeypatch.setenv( "RAY_testing_rpc_failure", - f"CoreWorkerService.grpc_client.GetObjectStatus=1:{failure}", + create_failure_json( + "CoreWorkerService.grpc_client.GetObjectStatus", 1, failure + ), ) ray.init() @@ -134,7 +153,9 @@ def test_wait_for_actor_ref_deleted_rpc_retry_and_idempotency( failure = RPC_FAILURE_MAP[deterministic_failure] monkeypatch.setenv( "RAY_testing_rpc_failure", - f"CoreWorkerService.grpc_client.WaitForActorRefDeleted=1:{failure}", + create_failure_json( + "CoreWorkerService.grpc_client.WaitForActorRefDeleted", 1, failure + ), ) ray.init() @@ -172,7 +193,9 @@ def inject_cancel_remote_task_rpc_failure(monkeypatch, request): failure = RPC_FAILURE_MAP[deterministic_failure] monkeypatch.setenv( "RAY_testing_rpc_failure", - f"CoreWorkerService.grpc_client.CancelRemoteTask=1:{failure}", + create_failure_json( + "CoreWorkerService.grpc_client.CancelRemoteTask", 1, failure + ), ) @@ -213,7 +236,8 @@ def remote_wait(sg): def test_double_borrowing_with_rpc_failure(monkeypatch, shutdown_only): """Regression test for https://github.com/ray-project/ray/issues/57997""" monkeypatch.setenv( - "RAY_testing_rpc_failure", "CoreWorkerService.grpc_client.PushTask=3:0:100:0" + "RAY_testing_rpc_failure", + create_failure_json("CoreWorkerService.grpc_client.PushTask", 3, "0:100:0"), ) ray.init() diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index f56c2506d1dd..419d5b34d2e2 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -1,4 +1,5 @@ import asyncio +import json import os import signal import subprocess @@ -1261,7 +1262,16 @@ def test_mark_job_finished_rpc_retry_and_idempotency(shutdown_only, monkeypatch) # We inject request failures to force retries and test idempotency monkeypatch.setenv( "RAY_testing_rpc_failure", - "ray::rpc::JobInfoGcsService.grpc_client.MarkJobFinished=3:50:0:0", + json.dumps( + { + "ray::rpc::JobInfoGcsService.grpc_client.MarkJobFinished": { + "num_failures": 3, + "req_failure_prob": 50, + "resp_failure_prob": 0, + "in_flight_failure_prob": 0, + } + } + ), ) ray.init(num_cpus=1) diff --git a/python/ray/tests/test_gcs_utils.py b/python/ray/tests/test_gcs_utils.py index 256e2df07b19..0df2b229d767 100644 --- a/python/ray/tests/test_gcs_utils.py +++ b/python/ray/tests/test_gcs_utils.py @@ -1,5 +1,6 @@ import asyncio import contextlib +import json import os import signal import sys @@ -109,8 +110,22 @@ def test_kv_timeout(ray_start_regular): def test_kv_transient_network_error(shutdown_only, monkeypatch): monkeypatch.setenv( "RAY_testing_rpc_failure", - "ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet=5:25:25:25," - "ray::rpc::InternalKVGcsService.grpc_client.InternalKVPut=5:25:25:25", + json.dumps( + { + "ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet": { + "num_failures": 5, + "req_failure_prob": 25, + "resp_failure_prob": 25, + "in_flight_failure_prob": 25, + }, + "ray::rpc::InternalKVGcsService.grpc_client.InternalKVPut": { + "num_failures": 5, + "req_failure_prob": 25, + "resp_failure_prob": 25, + "in_flight_failure_prob": 25, + }, + } + ), ) ray.init() gcs_address = ray._private.worker.global_worker.gcs_client.address diff --git a/python/ray/tests/test_object_manager_fault_tolerance.py b/python/ray/tests/test_object_manager_fault_tolerance.py index 933baa2de332..141c2489c329 100644 --- a/python/ray/tests/test_object_manager_fault_tolerance.py +++ b/python/ray/tests/test_object_manager_fault_tolerance.py @@ -1,3 +1,4 @@ +import json import sys import numpy as np @@ -13,6 +14,20 @@ from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy +def create_failure_json(method, num_failures, failure_str): + parts = failure_str.split(":") + return json.dumps( + { + method: { + "num_failures": num_failures, + "req_failure_prob": int(parts[0]), + "resp_failure_prob": int(parts[1]), + "in_flight_failure_prob": int(parts[2]), + } + } + ) + + @pytest.mark.parametrize("deterministic_failure", RPC_FAILURE_TYPES) def test_free_objects_idempotent( monkeypatch, shutdown_only, deterministic_failure, ray_start_cluster @@ -20,7 +35,7 @@ def test_free_objects_idempotent( failure = RPC_FAILURE_MAP[deterministic_failure] monkeypatch.setenv( "RAY_testing_rpc_failure", - f"ObjectManagerService.grpc_client.FreeObjects=1:{failure}", + create_failure_json("ObjectManagerService.grpc_client.FreeObjects", 1, failure), ) @ray.remote diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index ee38d64e3af8..96a3e4ca8224 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -1,3 +1,4 @@ +import json import os import sys @@ -19,6 +20,20 @@ import psutil +def create_failure_json(method, num_failures, failure_str): + parts = failure_str.split(":") + return json.dumps( + { + method: { + "num_failures": num_failures, + "req_failure_prob": int(parts[0]), + "resp_failure_prob": int(parts[1]), + "in_flight_failure_prob": int(parts[2]), + } + } + ) + + @pytest.mark.parametrize("deterministic_failure", RPC_FAILURE_TYPES) def test_request_worker_lease_idempotent( monkeypatch, shutdown_only, deterministic_failure, ray_start_cluster @@ -26,7 +41,9 @@ def test_request_worker_lease_idempotent( failure = RPC_FAILURE_MAP[deterministic_failure] monkeypatch.setenv( "RAY_testing_rpc_failure", - f"NodeManagerService.grpc_client.RequestWorkerLease=1:{failure}", + create_failure_json( + "NodeManagerService.grpc_client.RequestWorkerLease", 1, failure + ), ) @ray.remote @@ -61,7 +78,16 @@ def test_drain_node_idempotent(monkeypatch, shutdown_only, ray_start_cluster): # NOTE: not testing response failure since the node is already marked as draining and shuts down gracefully. monkeypatch.setenv( "RAY_testing_rpc_failure", - "NodeManagerService.grpc_client.DrainRaylet=1:100:0:0", + json.dumps( + { + "NodeManagerService.grpc_client.DrainRaylet": { + "num_failures": 1, + "req_failure_prob": 100, + "resp_failure_prob": 0, + "in_flight_failure_prob": 0, + } + } + ), ) cluster = ray_start_cluster @@ -99,10 +125,25 @@ def node_is_dead(): def inject_release_unused_bundles_rpc_failure(monkeypatch, request): deterministic_failure = request.param failure = RPC_FAILURE_MAP[deterministic_failure] + parts = failure.split(":") monkeypatch.setenv( "RAY_testing_rpc_failure", - f"NodeManagerService.grpc_client.ReleaseUnusedBundles=1:{failure}" - + ",NodeManagerService.grpc_client.CancelResourceReserve=-1:100:0:0", + json.dumps( + { + "NodeManagerService.grpc_client.ReleaseUnusedBundles": { + "num_failures": 1, + "req_failure_prob": int(parts[0]), + "resp_failure_prob": int(parts[1]), + "in_flight_failure_prob": int(parts[2]), + }, + "NodeManagerService.grpc_client.CancelResourceReserve": { + "num_failures": -1, + "req_failure_prob": 100, + "resp_failure_prob": 0, + "in_flight_failure_prob": 0, + }, + } + ), ) @@ -155,7 +196,9 @@ def inject_notify_gcs_restart_rpc_failure(monkeypatch, request): failure = RPC_FAILURE_MAP[deterministic_failure] monkeypatch.setenv( "RAY_testing_rpc_failure", - f"NodeManagerService.grpc_client.NotifyGCSRestart=1:{failure}", + create_failure_json( + "NodeManagerService.grpc_client.NotifyGCSRestart", 1, failure + ), ) @@ -215,7 +258,16 @@ def test_kill_local_actor_rpc_retry_and_idempotency(monkeypatch, shutdown_only): monkeypatch.setenv( "RAY_testing_rpc_failure", - "NodeManagerService.grpc_client.KillLocalActor=1:100:0:0", + json.dumps( + { + "NodeManagerService.grpc_client.KillLocalActor": { + "num_failures": 1, + "req_failure_prob": 100, + "resp_failure_prob": 0, + "in_flight_failure_prob": 0, + } + } + ), ) ray.init() diff --git a/python/ray/tests/test_streaming_generator_4.py b/python/ray/tests/test_streaming_generator_4.py index e0f0c5480885..34f5582cd35c 100644 --- a/python/ray/tests/test_streaming_generator_4.py +++ b/python/ray/tests/test_streaming_generator_4.py @@ -1,5 +1,6 @@ import asyncio import gc +import json import os import random import signal @@ -190,7 +191,16 @@ def test_many_tasks_lineage_reconstruction_mini_stress_test( ) m.setenv( "RAY_testing_rpc_failure", - "CoreWorkerService.grpc_client.ReportGeneratorItemReturns=5:25:25:25", + json.dumps( + { + "CoreWorkerService.grpc_client.ReportGeneratorItemReturns": { + "num_failures": 5, + "req_failure_prob": 25, + "resp_failure_prob": 25, + "in_flight_failure_prob": 25, + } + } + ), ) cluster = ray_start_cluster cluster.add_node( diff --git a/src/ray/rpc/BUILD.bazel b/src/ray/rpc/BUILD.bazel index 654d6a16c4e1..78ded3e8fa6b 100644 --- a/src/ray/rpc/BUILD.bazel +++ b/src/ray/rpc/BUILD.bazel @@ -94,6 +94,7 @@ ray_cc_library( "//src/ray/common:ray_config", "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/synchronization", + "@nlohmann_json", ], ) diff --git a/src/ray/rpc/rpc_chaos.cc b/src/ray/rpc/rpc_chaos.cc index d957eb23d91c..7d19b9cae626 100644 --- a/src/ray/rpc/rpc_chaos.cc +++ b/src/ray/rpc/rpc_chaos.cc @@ -20,6 +20,7 @@ #include "absl/container/flat_hash_map.h" #include "absl/synchronization/mutex.h" +#include "nlohmann/json.hpp" #include "ray/common/ray_config.h" namespace ray { @@ -30,18 +31,17 @@ namespace testing { // should set up os environment to use this feature for testing purposes. // You can use this to set probabilities for specific rpc's. -// export RAY_testing_rpc_failure="method1=3:12:12:50,method2=5:10:25:25" -// Key is the RPC call name and value is a four part colon separated structure. It -// contains the max number of failures to inject + probability of req failure + -// probability of reply failure + probability of in-flight failure. - +// export +// RAY_testing_rpc_failure='{"method1":{"num_failures":3,"req_failure_prob":12,"resp_failure_prob":12,"in_flight_failure_prob":50}}' +// // You can also use a wildcard to set probabilities for all rpc's and -1 as num_failures // to have unlimited failures. -// export RAY_testing_rpc_failure="*=-1:10:25:50" +// export +// RAY_testing_rpc_failure='{"*":{"num_failures":-1,"req_failure_prob":10,"resp_failure_prob":25,"in_flight_failure_prob":50}}' // This will set the probabilities for all rpc's to 10% for request failures, 25% for // reply failures, and 50% for in-flight failures. -// You can also provide 5th, 6th, and / or 7th optional parameters to specify that there +// You can also provide optional parameters to specify that there // should be at least a certain amount of request, response, and in-flight failures. // By default these are set to 0, but by setting them to positive values guarantees that // the first N RPCs will have X request failures, followed by Y response failures, @@ -54,7 +54,8 @@ namespace testing { // Ex. unlimited failures for all rpc's with 25% request failures, 50% response failures, // and 10% in-flight failures with at least 2 request failures, 3 response failures, and 1 // in-flight failure. -// export RAY_testing_rpc_failure="*=-1:25:50:10:2:3:1" +// export +// RAY_testing_rpc_failure='{"*":{"num_failures":-1,"req_failure_prob":25,"resp_failure_prob":50,"in_flight_failure_prob":10,"num_lower_bound_req_failures":2,"num_lower_bound_resp_failures":3,"num_lower_bound_in_flight_failures":1}}' class RpcFailureManager { public: @@ -72,29 +73,31 @@ class RpcFailureManager { has_failures_ = false; if (!RayConfig::instance().testing_rpc_failure().empty()) { - for (const auto &item : - absl::StrSplit(RayConfig::instance().testing_rpc_failure(), ',')) { - std::vector equal_split = absl::StrSplit(item, '='); - RAY_CHECK_EQ(equal_split.size(), 2UL); - std::vector colon_split = absl::StrSplit(equal_split[1], ':'); - RAY_CHECK_GE(colon_split.size(), 4UL); - RAY_CHECK_LE(colon_split.size(), 7UL); + auto json = nlohmann::json::parse( + RayConfig::instance().testing_rpc_failure(), nullptr, false); + if (json.is_discarded()) { + RAY_LOG(FATAL) << "testing_rpc_failure is not a valid json object: " + << RayConfig::instance().testing_rpc_failure(); + return; + } + + for (auto &[key, value] : json.items()) { auto [iter, _] = failable_methods_.emplace( - equal_split[0], + key, Failable{ - std::stol(colon_split[0]), - std::stoul(colon_split[1]), - std::stoul(colon_split[2]), - std::stoul(colon_split[3]), - colon_split.size() >= 5UL ? std::stoul(colon_split[4]) : 0UL, - colon_split.size() >= 6UL ? std::stoul(colon_split[5]) : 0UL, - colon_split.size() >= 7UL ? std::stoul(colon_split[6]) : 0UL, + value.value("num_failures", 0L), + value.value("req_failure_prob", 0UL), + value.value("resp_failure_prob", 0UL), + value.value("in_flight_failure_prob", 0UL), + value.value("num_lower_bound_req_failures", 0UL), + value.value("num_lower_bound_resp_failures", 0UL), + value.value("num_lower_bound_in_flight_failures", 0UL), }); const auto &failable = iter->second; RAY_CHECK_LE(failable.req_failure_prob + failable.resp_failure_prob + failable.in_flight_failure_prob, 100UL); - if (equal_split[0] == "*") { + if (key == "*") { wildcard_set_ = true; // The wildcard overrides all other method configurations. break; diff --git a/src/ray/rpc/tests/rpc_chaos_test.cc b/src/ray/rpc/tests/rpc_chaos_test.cc index 3b12a8cb7695..878d89bbd73b 100644 --- a/src/ray/rpc/tests/rpc_chaos_test.cc +++ b/src/ray/rpc/tests/rpc_chaos_test.cc @@ -20,7 +20,8 @@ namespace ray::rpc::testing { TEST(RpcChaosTest, MethodRpcFailure) { - RayConfig::instance().testing_rpc_failure() = "method1=0:25:25:25,method2=1:100:0:0"; + RayConfig::instance().testing_rpc_failure() = + R"({"method1":{"num_failures":0,"req_failure_prob":25,"resp_failure_prob":25,"in_flight_failure_prob":25},"method2":{"num_failures":1,"req_failure_prob":100,"resp_failure_prob":0,"in_flight_failure_prob":0}})"; Init(); ASSERT_EQ(GetRpcFailure("unknown"), RpcFailure::None); ASSERT_EQ(GetRpcFailure("method1"), RpcFailure::None); @@ -31,7 +32,7 @@ TEST(RpcChaosTest, MethodRpcFailure) { TEST(RpcChaosTest, MethodRpcFailureEdgeCase) { RayConfig::instance().testing_rpc_failure() = - "method1=1000:100:0:0,method2=1000:0:100:0,method3=1000:0:0:100,method4=1000:0:0:0"; + R"({"method1":{"num_failures":1000,"req_failure_prob":100,"resp_failure_prob":0,"in_flight_failure_prob":0},"method2":{"num_failures":1000,"req_failure_prob":0,"resp_failure_prob":100,"in_flight_failure_prob":0},"method3":{"num_failures":1000,"req_failure_prob":0,"resp_failure_prob":0,"in_flight_failure_prob":100},"method4":{"num_failures":1000,"req_failure_prob":0,"resp_failure_prob":0,"in_flight_failure_prob":0}})"; Init(); for (int i = 0; i < 1000; i++) { ASSERT_EQ(GetRpcFailure("method1"), RpcFailure::Request); @@ -42,25 +43,29 @@ TEST(RpcChaosTest, MethodRpcFailureEdgeCase) { } TEST(RpcChaosTest, WildcardRpcFailure) { - RayConfig::instance().testing_rpc_failure() = "*=-1:100:0:0"; + RayConfig::instance().testing_rpc_failure() = + R"({"*":{"num_failures":-1,"req_failure_prob":100,"resp_failure_prob":0,"in_flight_failure_prob":0}})"; Init(); for (int i = 0; i < 100; i++) { ASSERT_EQ(GetRpcFailure("method"), RpcFailure::Request); } - RayConfig::instance().testing_rpc_failure() = "*=-1:0:100:0"; + RayConfig::instance().testing_rpc_failure() = + R"({"*":{"num_failures":-1,"req_failure_prob":0,"resp_failure_prob":100,"in_flight_failure_prob":0}})"; Init(); for (int i = 0; i < 100; i++) { ASSERT_EQ(GetRpcFailure("method"), RpcFailure::Response); } - RayConfig::instance().testing_rpc_failure() = "*=-1:0:0:100"; + RayConfig::instance().testing_rpc_failure() = + R"({"*":{"num_failures":-1,"req_failure_prob":0,"resp_failure_prob":0,"in_flight_failure_prob":100}})"; Init(); for (int i = 0; i < 100; i++) { ASSERT_EQ(GetRpcFailure("method"), RpcFailure::InFlight); } - RayConfig::instance().testing_rpc_failure() = "*=-1:0:0:0"; + RayConfig::instance().testing_rpc_failure() = + R"({"*":{"num_failures":-1,"req_failure_prob":0,"resp_failure_prob":0,"in_flight_failure_prob":0}})"; Init(); for (int i = 0; i < 100; i++) { ASSERT_EQ(GetRpcFailure("method"), RpcFailure::None); @@ -73,7 +78,8 @@ TEST(RpcChaosTest, LowerBoundWithWildcard) { // 100% req prob after lower bound, 0% resp prob, 0% resp in-flight prob, // 3 guaranteed req failures, 5 guaranteed resp failures, 2 guaranteed resp // in-flight failures - RayConfig::instance().testing_rpc_failure() = "*=-1:100:0:0:3:5:2"; + RayConfig::instance().testing_rpc_failure() = + R"({"*":{"num_failures":-1,"req_failure_prob":100,"resp_failure_prob":0,"in_flight_failure_prob":0,"num_lower_bound_req_failures":3,"num_lower_bound_resp_failures":5,"num_lower_bound_in_flight_failures":2}})"; Init(); // First 3 calls should be guaranteed Request failures (lower bound) From 759ea5403348417ef0cb95b983a74fdcae64fbf2 Mon Sep 17 00:00:00 2001 From: dancingactor Date: Sat, 6 Dec 2025 11:47:00 +0800 Subject: [PATCH 2/3] - add LOG(FATAL) and the test for it - modified variable names Signed-off-by: dancingactor --- src/ray/rpc/rpc_chaos.cc | 31 +++++++++++++++++++---------- src/ray/rpc/tests/rpc_chaos_test.cc | 7 +++++++ 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/ray/rpc/rpc_chaos.cc b/src/ray/rpc/rpc_chaos.cc index 7d19b9cae626..f31a2480f103 100644 --- a/src/ray/rpc/rpc_chaos.cc +++ b/src/ray/rpc/rpc_chaos.cc @@ -81,23 +81,34 @@ class RpcFailureManager { return; } - for (auto &[key, value] : json.items()) { + for (auto &[method, config] : json.items()) { + for (const auto &[config_key, _] : config.items()) { + if (config_key != "num_failures" && config_key != "req_failure_prob" && + config_key != "resp_failure_prob" && + config_key != "in_flight_failure_prob" && + config_key != "num_lower_bound_req_failures" && + config_key != "num_lower_bound_resp_failures" && + config_key != "num_lower_bound_in_flight_failures") { + RAY_LOG(FATAL) << "Unknown key specified in testing_rpc_failure config: " + << config_key; + } + } auto [iter, _] = failable_methods_.emplace( - key, + method, Failable{ - value.value("num_failures", 0L), - value.value("req_failure_prob", 0UL), - value.value("resp_failure_prob", 0UL), - value.value("in_flight_failure_prob", 0UL), - value.value("num_lower_bound_req_failures", 0UL), - value.value("num_lower_bound_resp_failures", 0UL), - value.value("num_lower_bound_in_flight_failures", 0UL), + config.value("num_failures", 0L), + config.value("req_failure_prob", 0UL), + config.value("resp_failure_prob", 0UL), + config.value("in_flight_failure_prob", 0UL), + config.value("num_lower_bound_req_failures", 0UL), + config.value("num_lower_bound_resp_failures", 0UL), + config.value("num_lower_bound_in_flight_failures", 0UL), }); const auto &failable = iter->second; RAY_CHECK_LE(failable.req_failure_prob + failable.resp_failure_prob + failable.in_flight_failure_prob, 100UL); - if (key == "*") { + if (method == "*") { wildcard_set_ = true; // The wildcard overrides all other method configurations. break; diff --git a/src/ray/rpc/tests/rpc_chaos_test.cc b/src/ray/rpc/tests/rpc_chaos_test.cc index 878d89bbd73b..7d44318dbbdd 100644 --- a/src/ray/rpc/tests/rpc_chaos_test.cc +++ b/src/ray/rpc/tests/rpc_chaos_test.cc @@ -126,4 +126,11 @@ TEST(RpcChaosTest, LowerBoundWithWildcard) { } } +TEST(RpcChaosTest, TestInvalidJson) { + RayConfig::instance().testing_rpc_failure() = + R"({"*":{"num_failures":-1,"invalid_key":1}})"; + ASSERT_DEATH(Init(), + "Unknown key specified in testing_rpc_failure config: invalid_key"); +} + } // namespace ray::rpc::testing From c0c009536cac0c204f27c594990e0095c920df66 Mon Sep 17 00:00:00 2001 From: dancingactor Date: Sat, 6 Dec 2025 19:28:49 +0800 Subject: [PATCH 3/3] add validation that config values are JSON objects Signed-off-by: dancingactor --- src/ray/rpc/rpc_chaos.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/ray/rpc/rpc_chaos.cc b/src/ray/rpc/rpc_chaos.cc index f31a2480f103..f557d3495ff3 100644 --- a/src/ray/rpc/rpc_chaos.cc +++ b/src/ray/rpc/rpc_chaos.cc @@ -82,6 +82,12 @@ class RpcFailureManager { } for (auto &[method, config] : json.items()) { + if (!config.is_object()) { + RAY_LOG(FATAL) << "Value for method '" << method + << "' in testing_rpc_failure config is not a JSON object: " + << config.dump(); + continue; + } for (const auto &[config_key, _] : config.items()) { if (config_key != "num_failures" && config_key != "req_failure_prob" && config_key != "resp_failure_prob" &&