Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick][core] Retry failed redis request (#35249) #35481

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2619,6 +2619,31 @@ cc_test(
],
)

cc_test(
name = "chaos_redis_store_client_test",
size = "small",
srcs = ["src/ray/gcs/store_client/test/redis_store_client_test.cc"],
args = [
"$(location redis-server)",
"$(location redis-cli)",
],
copts = COPTS,
data = [
"//:redis-cli",
"//:redis-server",
],
env = {"REDIS_CHAOS": "1"},
tags = ["team:core"],
target_compatible_with = [
"@platforms//os:linux",
],
deps = [
":redis_store_client",
":store_client_test_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "in_memory_store_client_test",
size = "small",
Expand Down
9 changes: 9 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,15 @@ RAY_CONFIG(int, worker_niceness, 15)
RAY_CONFIG(int64_t, redis_db_connect_retries, 600)
RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100)

/// Number of retries for a redis request failure.
RAY_CONFIG(size_t, num_redis_request_retries, 5)

/// Exponential backoff setup. By default:
/// 100ms, 200ms, 400ms, 800ms, 1s, 1s,...
RAY_CONFIG(int64_t, redis_retry_base_ms, 100)
RAY_CONFIG(int64_t, redis_retry_multiplier, 2)
RAY_CONFIG(int64_t, redis_retry_max_ms, 1000)

/// The object manager's global timer interval in milliseconds.
RAY_CONFIG(int, object_manager_timer_freq_ms, 100)

Expand Down
31 changes: 23 additions & 8 deletions src/ray/common/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@

namespace ray {

void TestSetupUtil::StartUpRedisServers(const std::vector<int> &redis_server_ports) {
void TestSetupUtil::StartUpRedisServers(const std::vector<int> &redis_server_ports,
bool save) {
if (redis_server_ports.empty()) {
TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(0));
TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(0, save));
} else {
for (const auto &port : redis_server_ports) {
TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(port));
TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(port, save));
}
}
}

// start a redis server with specified port, use random one when 0 given
int TestSetupUtil::StartUpRedisServer(const int &port) {
int TestSetupUtil::StartUpRedisServer(int port, bool save) {
int actual_port = port;
if (port == 0) {
static std::atomic<bool> srand_called(false);
Expand All @@ -58,8 +59,12 @@ int TestSetupUtil::StartUpRedisServer(const int &port) {
#ifdef _WIN32
std::vector<std::string> cmdargs({program, "--loglevel", "warning"});
#else
std::vector<std::string> cmdargs(
{program, "--loglevel", "warning", "--save", "", "--appendonly", "no"});
std::vector<std::string> cmdargs;
if (!save) {
cmdargs = {program, "--loglevel", "warning", "--save", "", "--appendonly", "no"};
} else {
cmdargs = {program, "--loglevel", "warning"};
}
#endif
cmdargs.insert(cmdargs.end(), {"--port", std::to_string(actual_port)});
RAY_LOG(INFO) << "Start redis command is: " << CreateCommandLine(cmdargs);
Expand All @@ -75,7 +80,7 @@ void TestSetupUtil::ShutDownRedisServers() {
TEST_REDIS_SERVER_PORTS = std::vector<int>();
}

void TestSetupUtil::ShutDownRedisServer(const int &port) {
void TestSetupUtil::ShutDownRedisServer(int port) {
std::vector<std::string> cmdargs(
{TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port), "shutdown"});
RAY_LOG(INFO) << "Stop redis command is: " << CreateCommandLine(cmdargs);
Expand All @@ -91,7 +96,17 @@ void TestSetupUtil::FlushAllRedisServers() {
}
}

void TestSetupUtil::FlushRedisServer(const int &port) {
void TestSetupUtil::ExecuteRedisCmd(int port, std::vector<std::string> cmd) {
std::vector<std::string> cmdargs(
{TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port)});
cmdargs.insert(cmdargs.end(), cmd.begin(), cmd.end());
RAY_LOG(INFO) << "Send command to redis: " << CreateCommandLine(cmdargs);
if (Process::Call(cmdargs)) {
RAY_LOG(WARNING) << "Failed to send request to redis.";
}
}

void TestSetupUtil::FlushRedisServer(int port) {
std::vector<std::string> cmdargs(
{TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port), "flushall"});
RAY_LOG(INFO) << "Cleaning up redis with command: " << CreateCommandLine(cmdargs);
Expand Down
12 changes: 6 additions & 6 deletions src/ray/common/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ extern std::string TEST_MOCK_WORKER_EXEC_PATH;
/// 5. start/stop raylet monitor
class TestSetupUtil {
public:
static void StartUpRedisServers(const std::vector<int> &redis_server_ports);
static void StartUpRedisServers(const std::vector<int> &redis_server_ports,
bool save = false);
static void ShutDownRedisServers();
static void FlushAllRedisServers();

Expand All @@ -124,11 +125,10 @@ class TestSetupUtil {
const std::string &resource,
std::string *store_socket_name);
static void StopRaylet(const std::string &raylet_socket_name);

private:
static int StartUpRedisServer(const int &port);
static void ShutDownRedisServer(const int &port);
static void FlushRedisServer(const int &port);
static void ExecuteRedisCmd(int port, std::vector<std::string> cmd);
static int StartUpRedisServer(int port, bool save = false);
static void ShutDownRedisServer(int port);
static void FlushRedisServer(int port);
};

} // namespace ray
8 changes: 1 addition & 7 deletions src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,7 @@ void GcsRedisFailureDetector::DetectRedis() {
callback_();
}
};

Status status = redis_context_->RunArgvAsync({"PING"}, redis_callback);

if (!status.ok()) {
RAY_LOG(ERROR) << "Redis is disconnected.";
callback_();
}
redis_context_->RunArgvAsync({"PING"}, redis_callback);
}

} // namespace gcs
Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class GcsServerTest : public ::testing::Test {
gcs_server_->Stop();
thread_io_service_->join();
gcs_server_.reset();
ray::gcs::RedisCallbackManager::instance().Clear();
rpc::ResetServerCallExecutor();
}

Expand Down
163 changes: 74 additions & 89 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <sstream>

#include "ray/common/asio/asio_util.h"
#include "ray/stats/metric_defs.h"
#include "ray/util/util.h"

Expand All @@ -26,33 +27,10 @@ extern "C" {
}

// TODO(pcm): Integrate into the C++ tree.
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "ray/common/ray_config.h"

namespace {

/// A helper function to call the callback and delete it from the callback
/// manager if necessary.
void ProcessCallback(int64_t callback_index,
std::shared_ptr<ray::gcs::CallbackReply> callback_reply) {
RAY_CHECK(callback_index >= 0) << "The callback index must be greater than 0, "
<< "but it actually is " << callback_index;
auto callback_item =
ray::gcs::RedisCallbackManager::instance().GetCallback(callback_index);

// Record the redis latency
auto end_time = absl::GetCurrentTimeNanos() / 1000;
ray::stats::GcsLatency().Record(end_time - callback_item->start_time_);

// Dispatch the callback.
callback_item->Dispatch(callback_reply);

// Delete the callback
ray::gcs::RedisCallbackManager::instance().RemoveCallback(callback_index);
}

} // namespace

namespace ray {

namespace gcs {
Expand All @@ -65,7 +43,7 @@ CallbackReply::CallbackReply(redisReply *redis_reply) : reply_type_(redis_reply-
break;
}
case REDIS_REPLY_ERROR: {
RAY_CHECK(false) << "Got an error in redis reply: " << redis_reply->str;
RAY_LOG(FATAL) << "Got an error in redis reply: " << redis_reply->str;
break;
}
case REDIS_REPLY_INTEGER: {
Expand Down Expand Up @@ -99,6 +77,8 @@ CallbackReply::CallbackReply(redisReply *redis_reply) : reply_type_(redis_reply-
}
}

bool CallbackReply::IsError() const { return reply_type_ == REDIS_REPLY_ERROR; }

void CallbackReply::ParseAsStringArrayOrScanArray(redisReply *redis_reply) {
RAY_CHECK(REDIS_REPLY_ARRAY == redis_reply->type);
const auto array_size = static_cast<size_t>(redis_reply->elements);
Expand Down Expand Up @@ -172,55 +152,72 @@ const std::vector<std::optional<std::string>> &CallbackReply::ReadAsStringArray(
return string_array_reply_;
}

// This is a global redis callback which will be registered for every
// asynchronous redis call. It dispatches the appropriate callback
// that was registered with the RedisCallbackManager.
void GlobalRedisCallback(void *c, void *r, void *privdata) {
if (r == nullptr) {
return;
RedisRequestContext::RedisRequestContext(instrumented_io_context &io_service,
RedisCallback callback,
RedisAsyncContext *context,
std::vector<std::string> args)
: exp_back_off_(RayConfig::instance().redis_retry_base_ms(),
RayConfig::instance().redis_retry_multiplier(),
RayConfig::instance().redis_retry_max_ms()),
io_service_(io_service),
redis_context_(context),
pending_retries_(RayConfig::instance().num_redis_request_retries() + 1),
callback_(std::move(callback)),
start_time_(absl::Now()),
redis_cmds_(std::move(args)) {
for (size_t i = 0; i < redis_cmds_.size(); ++i) {
argv_.push_back(redis_cmds_[i].data());
argc_.push_back(redis_cmds_[i].size());
}
int64_t callback_index = reinterpret_cast<int64_t>(privdata);
redisReply *reply = reinterpret_cast<redisReply *>(r);
ProcessCallback(callback_index, std::make_shared<CallbackReply>(reply));
}

int64_t RedisCallbackManager::AllocateCallbackIndex() {
std::lock_guard<std::mutex> lock(mutex_);
return num_callbacks_++;
}

int64_t RedisCallbackManager::AddCallback(const RedisCallback &function,
instrumented_io_context &io_service,
int64_t callback_index) {
auto start_time = absl::GetCurrentTimeNanos() / 1000;

std::lock_guard<std::mutex> lock(mutex_);
if (callback_index == -1) {
// No callback index was specified. Allocate a new callback index.
callback_index = num_callbacks_;
num_callbacks_++;
void RedisRequestContext::Run() {
if (pending_retries_ == 0) {
RAY_LOG(FATAL) << "Failed to run redis cmds: [" << absl::StrJoin(redis_cmds_, " ")
<< "] for " << RayConfig::instance().num_redis_request_retries()
<< " times.";
}

--pending_retries_;

auto fn =
+[](struct redisAsyncContext *async_context, void *raw_reply, void *privdata) {
auto *request_cxt = (RedisRequestContext *)privdata;
auto redis_reply = reinterpret_cast<redisReply *>(raw_reply);
// Error happened.
if (redis_reply == nullptr || redis_reply->type == REDIS_REPLY_ERROR) {
auto error_msg = redis_reply ? redis_reply->str : async_context->errstr;
RAY_LOG(ERROR) << "Redis request ["
<< absl::StrJoin(request_cxt->redis_cmds_, " ") << "]"
<< " failed due to error " << error_msg << ". "
<< request_cxt->pending_retries_ << " retries left.";
auto delay = request_cxt->exp_back_off_.Current();
request_cxt->exp_back_off_.Next();
// Retry the request after a while.
execute_after(
request_cxt->io_service_,
[request_cxt]() { request_cxt->Run(); },
std::chrono::milliseconds(delay));
} else {
auto reply = std::make_shared<CallbackReply>(redis_reply);
request_cxt->io_service_.post(
[reply, callback = std::move(request_cxt->callback_)]() {
callback(std::move(reply));
},
"RedisRequestContext.Callback");
auto end_time = absl::Now();
ray::stats::GcsLatency().Record((end_time - request_cxt->start_time_) /
absl::Milliseconds(1));
delete request_cxt;
}
};

Status status = redis_context_->RedisAsyncCommandArgv(
fn, this, argv_.size(), argv_.data(), argc_.data());

if (!status.ok()) {
fn(redis_context_->GetRawRedisAsyncContext(), nullptr, this);
}
callback_items_.emplace(
callback_index, std::make_shared<CallbackItem>(function, start_time, io_service));
return callback_index;
}

std::shared_ptr<RedisCallbackManager::CallbackItem> RedisCallbackManager::GetCallback(
int64_t callback_index) const {
std::lock_guard<std::mutex> lock(mutex_);
auto it = callback_items_.find(callback_index);
RAY_CHECK(it != callback_items_.end()) << callback_index;
return it->second;
}

void RedisCallbackManager::Clear() {
std::lock_guard<std::mutex> lock(mutex_);
callback_items_.clear();
}

void RedisCallbackManager::RemoveCallback(int64_t callback_index) {
std::lock_guard<std::mutex> lock(mutex_);
callback_items_.erase(callback_index);
}

#define REDIS_CHECK_ERROR(CONTEXT, REPLY) \
Expand Down Expand Up @@ -550,26 +547,14 @@ std::unique_ptr<CallbackReply> RedisContext::RunArgvSync(
return callback_reply;
}

Status RedisContext::RunArgvAsync(const std::vector<std::string> &args,
const RedisCallback &redis_callback) {
void RedisContext::RunArgvAsync(std::vector<std::string> args,
RedisCallback redis_callback) {
RAY_CHECK(redis_async_context_);
// Build the arguments.
std::vector<const char *> argv;
std::vector<size_t> argc;
for (size_t i = 0; i < args.size(); ++i) {
argv.push_back(args[i].data());
argc.push_back(args[i].size());
}
int64_t callback_index =
RedisCallbackManager::instance().AddCallback(redis_callback, io_service_);
// Run the Redis command.
Status status = redis_async_context_->RedisAsyncCommandArgv(
reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
reinterpret_cast<void *>(callback_index),
args.size(),
argv.data(),
argc.data());
return status;
auto request_context = new RedisRequestContext(io_service_,
std::move(redis_callback),
redis_async_context_.get(),
std::move(args));
request_context->Run();
}

void RedisContext::FreeRedisReply(void *reply) { return freeReplyObject(reply); }
Expand Down
Loading