Skip to content

Commit

Permalink
[Core][RFC] RPC network error chaos test framework (#48007)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Oct 17, 2024
1 parent 059f4ba commit d5fa9a0
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 11 deletions.
15 changes: 15 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ ray_cc_library(
"src/ray/rpc/common.cc",
"src/ray/rpc/grpc_server.cc",
"src/ray/rpc/server_call.cc",
"src/ray/rpc/rpc_chaos.cc",
],
hdrs = glob([
"src/ray/rpc/rpc_chaos.h",
"src/ray/rpc/client_call.h",
"src/ray/rpc/common.h",
"src/ray/rpc/grpc_client.h",
Expand Down Expand Up @@ -1551,6 +1553,19 @@ ray_cc_test(
],
)

ray_cc_test(
name = "rpc_chaos_test",
size = "small",
srcs = [
"src/ray/rpc/test/rpc_chaos_test.cc",
],
tags = ["team:core"],
deps = [
":grpc_common_lib",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_test(
name = "core_worker_client_pool_test",
size = "small",
Expand Down
14 changes: 14 additions & 0 deletions python/ray/tests/test_gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ def test_kv_timeout(ray_start_regular):
gcs_client.internal_kv_del(b"A", True, b"NS", timeout=2)


def test_kv_transient_network_error(shutdown_only, monkeypatch):
monkeypatch.setenv(
"RAY_testing_rpc_failure",
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet=5,"
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVPut=5",
)
ray.init()
gcs_address = ray._private.worker.global_worker.gcs_client.address
gcs_client = ray._raylet.GcsClient(address=gcs_address, nums_reconnect_retry=0)

gcs_client.internal_kv_put(b"A", b"Hello", True, b"")
assert gcs_client.internal_kv_get(b"A", b"") == b"Hello"


@pytest.mark.asyncio
async def test_kv_basic_aio(ray_start_regular):
gcs_client = gcs_utils.GcsAioClient(
Expand Down
5 changes: 5 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,11 @@ RAY_CONFIG(std::string, REDIS_SERVER_NAME, "")
// it will apply to all methods.
RAY_CONFIG(std::string, testing_asio_delay_us, "")

/// To use this, simply do
/// export
/// RAY_testing_rpc_failure="method1=max_num_failures,method2=max_num_failures"
RAY_CONFIG(std::string, testing_rpc_failure, "")

/// The following are configs for the health check. They are borrowed
/// from k8s health probe (shorturl.at/jmTY3)
/// The delay to send the first health check.
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {

RayConfig::instance().initialize(promise.get_future().get());
ray::asio::testing::init();
ray::rpc::testing::init();
}

void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() {
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ int main(int argc, char *argv[]) {

RayConfig::instance().initialize(config_list);
ray::asio::testing::init();
ray::rpc::testing::init();

// IO Service for main loop.
instrumented_io_context main_service;
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ int main(int argc, char *argv[]) {
RAY_CHECK(stored_raylet_config.has_value());
RayConfig::instance().initialize(*stored_raylet_config);
ray::asio::testing::init();
ray::rpc::testing::init();

// Core worker tries to kill child processes when it exits. But they can't do
// it perfectly: if the core worker is killed by SIGKILL, the child processes
Expand Down
52 changes: 41 additions & 11 deletions src/ray/rpc/grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "ray/common/status.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/common.h"
#include "ray/rpc/rpc_chaos.h"

namespace ray {
namespace rpc {
Expand Down Expand Up @@ -148,15 +149,43 @@ class GrpcClient {
const ClientCallback<Reply> &callback,
std::string call_name = "UNKNOWN_RPC",
int64_t method_timeout_ms = -1) {
auto call = client_call_manager_.CreateCall<GrpcService, Request, Reply>(
*stub_,
prepare_async_function,
request,
callback,
std::move(call_name),
method_timeout_ms);
RAY_CHECK(call != nullptr);
call_method_invoked_ = true;
testing::RpcFailure failure = testing::get_rpc_failure(call_name);
if (failure == testing::RpcFailure::Request) {
// Simulate the case where the PRC fails before server receives
// the request.
RAY_LOG(INFO) << "Inject RPC request failure for " << call_name;
client_call_manager_.GetMainService().post(
[callback]() {
callback(Status::RpcError("Unavailable", grpc::StatusCode::UNAVAILABLE),
Reply());
},
"RpcChaos");
} else if (failure == testing::RpcFailure::Response) {
// Simulate the case where the RPC fails after server sends
// the response.
RAY_LOG(INFO) << "Inject RPC response failure for " << call_name;
client_call_manager_.CreateCall<GrpcService, Request, Reply>(
*stub_,
prepare_async_function,
request,
[callback](const Status &status, Reply &&reply) {
callback(Status::RpcError("Unavailable", grpc::StatusCode::UNAVAILABLE),
Reply());
},
std::move(call_name),
method_timeout_ms);
} else {
auto call = client_call_manager_.CreateCall<GrpcService, Request, Reply>(
*stub_,
prepare_async_function,
request,
callback,
std::move(call_name),
method_timeout_ms);
RAY_CHECK(call != nullptr);
}

call_method_invoked_.store(true);
}

std::shared_ptr<grpc::Channel> Channel() const { return channel_; }
Expand All @@ -167,7 +196,8 @@ class GrpcClient {
/// Also see https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html
/// for channel connectivity state machine.
bool IsChannelIdleAfterRPCs() const {
return (channel_->GetState(false) == GRPC_CHANNEL_IDLE) && call_method_invoked_;
return (channel_->GetState(false) == GRPC_CHANNEL_IDLE) &&
call_method_invoked_.load();
}

private:
Expand All @@ -179,7 +209,7 @@ class GrpcClient {
/// The channel of the stub.
std::shared_ptr<grpc::Channel> channel_;
/// Whether CallMethod is invoked.
bool call_method_invoked_ = false;
std::atomic<bool> call_method_invoked_ = false;
};

} // namespace rpc
Expand Down
109 changes: 109 additions & 0 deletions src/ray/rpc/rpc_chaos.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/rpc/rpc_chaos.h"

#include <random>
#include <unordered_map>

#include "absl/synchronization/mutex.h"
#include "ray/common/ray_config.h"

namespace ray {
namespace rpc {
namespace testing {
namespace {

/*
RpcFailureManager is a simple chaos testing framework. Before starting ray, users
should set up os environment to use this feature for testing purposes.
To use this, simply do
export RAY_testing_rpc_failure="method1=3,method2=5"
Key is the RPC call name and value is the max number of failures to inject.
*/
class RpcFailureManager {
public:
RpcFailureManager() { Init(); }

void Init() {
absl::MutexLock lock(&mu_);

failable_methods_.clear();

if (!RayConfig::instance().testing_rpc_failure().empty()) {
for (const auto &item :
absl::StrSplit(RayConfig::instance().testing_rpc_failure(), ",")) {
std::vector<std::string> parts = absl::StrSplit(item, "=");
RAY_CHECK_EQ(parts.size(), 2UL);
failable_methods_.emplace(parts[0], std::atoi(parts[1].c_str()));
}

std::random_device rd;
auto seed = rd();
RAY_LOG(INFO) << "Setting RpcFailureManager seed to " << seed;
gen_.seed(seed);
}
}

RpcFailure GetRpcFailure(const std::string &name) {
absl::MutexLock lock(&mu_);

if (failable_methods_.find(name) == failable_methods_.end()) {
return RpcFailure::None;
}

uint64_t &num_remaining_failures = failable_methods_.at(name);
if (num_remaining_failures == 0) {
return RpcFailure::None;
}

std::uniform_int_distribution<int> dist(0, 3);
int rand = dist(gen_);
if (rand == 0) {
// 25% chance
num_remaining_failures--;
return RpcFailure::Request;
} else if (rand == 1) {
// 25% chance
num_remaining_failures--;
return RpcFailure::Response;
} else {
// 50% chance
return RpcFailure::None;
}
}

private:
absl::Mutex mu_;
std::mt19937 gen_;
// call name -> # remaining failures
std::unordered_map<std::string, uint64_t> failable_methods_ ABSL_GUARDED_BY(&mu_);
};

static RpcFailureManager _rpc_failure_manager;

} // namespace

RpcFailure get_rpc_failure(const std::string &name) {
if (RayConfig::instance().testing_rpc_failure().empty()) {
return RpcFailure::None;
}
return _rpc_failure_manager.GetRpcFailure(name);
}

void init() { _rpc_failure_manager.Init(); }

} // namespace testing
} // namespace rpc
} // namespace ray
37 changes: 37 additions & 0 deletions src/ray/rpc/rpc_chaos.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <string>

namespace ray {
namespace rpc {
namespace testing {

enum class RpcFailure {
None,
// Failure before server receives the request
Request,
// Failure after server sends the response
Response,
};

RpcFailure get_rpc_failure(const std::string &name);

void init();

} // namespace testing
} // namespace rpc
} // namespace ray
34 changes: 34 additions & 0 deletions src/ray/rpc/test/rpc_chaos_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/rpc/rpc_chaos.h"

#include <string>

#include "gtest/gtest.h"
#include "ray/common/ray_config.h"

TEST(RpcChaosTest, Basic) {
RayConfig::instance().testing_rpc_failure() = "method1=0,method2=1";
ray::rpc::testing::init();
ASSERT_EQ(ray::rpc::testing::get_rpc_failure("unknown"),
ray::rpc::testing::RpcFailure::None);
ASSERT_EQ(ray::rpc::testing::get_rpc_failure("method1"),
ray::rpc::testing::RpcFailure::None);
// At most one failure.
ASSERT_FALSE(ray::rpc::testing::get_rpc_failure("method2") !=
ray::rpc::testing::RpcFailure::None &&
ray::rpc::testing::get_rpc_failure("method2") !=
ray::rpc::testing::RpcFailure::None);
}

0 comments on commit d5fa9a0

Please sign in to comment.