Skip to content

Commit

Permalink
Revert "[core] Retry failed redis request (ray-project#35249) (ray-pr…
Browse files Browse the repository at this point in the history
…oject#35481)"

This reverts commit 64e4926.
  • Loading branch information
architkulkarni committed May 19, 2023
1 parent 1ac29e8 commit 50db62d
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 225 deletions.
25 changes: 0 additions & 25 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2619,31 +2619,6 @@ cc_test(
],
)

cc_test(
name = "chaos_redis_store_client_test",
size = "small",
srcs = ["src/ray/gcs/store_client/test/redis_store_client_test.cc"],
args = [
"$(location redis-server)",
"$(location redis-cli)",
],
copts = COPTS,
data = [
"//:redis-cli",
"//:redis-server",
],
env = {"REDIS_CHAOS": "1"},
tags = ["team:core"],
target_compatible_with = [
"@platforms//os:linux",
],
deps = [
":redis_store_client",
":store_client_test_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "in_memory_store_client_test",
size = "small",
Expand Down
9 changes: 0 additions & 9 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,6 @@ RAY_CONFIG(int, worker_niceness, 15)
RAY_CONFIG(int64_t, redis_db_connect_retries, 600)
RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100)

/// Number of retries for a redis request failure.
RAY_CONFIG(size_t, num_redis_request_retries, 5)

/// Exponential backoff setup. By default:
/// 100ms, 200ms, 400ms, 800ms, 1s, 1s,...
RAY_CONFIG(int64_t, redis_retry_base_ms, 100)
RAY_CONFIG(int64_t, redis_retry_multiplier, 2)
RAY_CONFIG(int64_t, redis_retry_max_ms, 1000)

/// The object manager's global timer interval in milliseconds.
RAY_CONFIG(int, object_manager_timer_freq_ms, 100)

Expand Down
31 changes: 8 additions & 23 deletions src/ray/common/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,18 @@

namespace ray {

void TestSetupUtil::StartUpRedisServers(const std::vector<int> &redis_server_ports,
bool save) {
void TestSetupUtil::StartUpRedisServers(const std::vector<int> &redis_server_ports) {
if (redis_server_ports.empty()) {
TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(0, save));
TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(0));
} else {
for (const auto &port : redis_server_ports) {
TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(port, save));
TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(port));
}
}
}

// start a redis server with specified port, use random one when 0 given
int TestSetupUtil::StartUpRedisServer(int port, bool save) {
int TestSetupUtil::StartUpRedisServer(const int &port) {
int actual_port = port;
if (port == 0) {
static std::atomic<bool> srand_called(false);
Expand All @@ -59,12 +58,8 @@ int TestSetupUtil::StartUpRedisServer(int port, bool save) {
#ifdef _WIN32
std::vector<std::string> cmdargs({program, "--loglevel", "warning"});
#else
std::vector<std::string> cmdargs;
if (!save) {
cmdargs = {program, "--loglevel", "warning", "--save", "", "--appendonly", "no"};
} else {
cmdargs = {program, "--loglevel", "warning"};
}
std::vector<std::string> cmdargs(
{program, "--loglevel", "warning", "--save", "", "--appendonly", "no"});
#endif
cmdargs.insert(cmdargs.end(), {"--port", std::to_string(actual_port)});
RAY_LOG(INFO) << "Start redis command is: " << CreateCommandLine(cmdargs);
Expand All @@ -80,7 +75,7 @@ void TestSetupUtil::ShutDownRedisServers() {
TEST_REDIS_SERVER_PORTS = std::vector<int>();
}

void TestSetupUtil::ShutDownRedisServer(int port) {
void TestSetupUtil::ShutDownRedisServer(const int &port) {
std::vector<std::string> cmdargs(
{TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port), "shutdown"});
RAY_LOG(INFO) << "Stop redis command is: " << CreateCommandLine(cmdargs);
Expand All @@ -96,17 +91,7 @@ void TestSetupUtil::FlushAllRedisServers() {
}
}

void TestSetupUtil::ExecuteRedisCmd(int port, std::vector<std::string> cmd) {
std::vector<std::string> cmdargs(
{TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port)});
cmdargs.insert(cmdargs.end(), cmd.begin(), cmd.end());
RAY_LOG(INFO) << "Send command to redis: " << CreateCommandLine(cmdargs);
if (Process::Call(cmdargs)) {
RAY_LOG(WARNING) << "Failed to send request to redis.";
}
}

void TestSetupUtil::FlushRedisServer(int port) {
void TestSetupUtil::FlushRedisServer(const int &port) {
std::vector<std::string> cmdargs(
{TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port), "flushall"});
RAY_LOG(INFO) << "Cleaning up redis with command: " << CreateCommandLine(cmdargs);
Expand Down
12 changes: 6 additions & 6 deletions src/ray/common/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ extern std::string TEST_MOCK_WORKER_EXEC_PATH;
/// 5. start/stop raylet monitor
class TestSetupUtil {
public:
static void StartUpRedisServers(const std::vector<int> &redis_server_ports,
bool save = false);
static void StartUpRedisServers(const std::vector<int> &redis_server_ports);
static void ShutDownRedisServers();
static void FlushAllRedisServers();

Expand All @@ -125,10 +124,11 @@ class TestSetupUtil {
const std::string &resource,
std::string *store_socket_name);
static void StopRaylet(const std::string &raylet_socket_name);
static void ExecuteRedisCmd(int port, std::vector<std::string> cmd);
static int StartUpRedisServer(int port, bool save = false);
static void ShutDownRedisServer(int port);
static void FlushRedisServer(int port);

private:
static int StartUpRedisServer(const int &port);
static void ShutDownRedisServer(const int &port);
static void FlushRedisServer(const int &port);
};

} // namespace ray
8 changes: 7 additions & 1 deletion src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ void GcsRedisFailureDetector::DetectRedis() {
callback_();
}
};
redis_context_->RunArgvAsync({"PING"}, redis_callback);

Status status = redis_context_->RunArgvAsync({"PING"}, redis_callback);

if (!status.ok()) {
RAY_LOG(ERROR) << "Redis is disconnected.";
callback_();
}
}

} // namespace gcs
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class GcsServerTest : public ::testing::Test {
gcs_server_->Stop();
thread_io_service_->join();
gcs_server_.reset();
ray::gcs::RedisCallbackManager::instance().Clear();
rpc::ResetServerCallExecutor();
}

Expand Down
163 changes: 89 additions & 74 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <sstream>

#include "ray/common/asio/asio_util.h"
#include "ray/stats/metric_defs.h"
#include "ray/util/util.h"

Expand All @@ -27,10 +26,33 @@ extern "C" {
}

// TODO(pcm): Integrate into the C++ tree.
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "ray/common/ray_config.h"

namespace {

/// A helper function to call the callback and delete it from the callback
/// manager if necessary.
void ProcessCallback(int64_t callback_index,
std::shared_ptr<ray::gcs::CallbackReply> callback_reply) {
RAY_CHECK(callback_index >= 0) << "The callback index must be greater than 0, "
<< "but it actually is " << callback_index;
auto callback_item =
ray::gcs::RedisCallbackManager::instance().GetCallback(callback_index);

// Record the redis latency
auto end_time = absl::GetCurrentTimeNanos() / 1000;
ray::stats::GcsLatency().Record(end_time - callback_item->start_time_);

// Dispatch the callback.
callback_item->Dispatch(callback_reply);

// Delete the callback
ray::gcs::RedisCallbackManager::instance().RemoveCallback(callback_index);
}

} // namespace

namespace ray {

namespace gcs {
Expand All @@ -43,7 +65,7 @@ CallbackReply::CallbackReply(redisReply *redis_reply) : reply_type_(redis_reply-
break;
}
case REDIS_REPLY_ERROR: {
RAY_LOG(FATAL) << "Got an error in redis reply: " << redis_reply->str;
RAY_CHECK(false) << "Got an error in redis reply: " << redis_reply->str;
break;
}
case REDIS_REPLY_INTEGER: {
Expand Down Expand Up @@ -77,8 +99,6 @@ CallbackReply::CallbackReply(redisReply *redis_reply) : reply_type_(redis_reply-
}
}

bool CallbackReply::IsError() const { return reply_type_ == REDIS_REPLY_ERROR; }

void CallbackReply::ParseAsStringArrayOrScanArray(redisReply *redis_reply) {
RAY_CHECK(REDIS_REPLY_ARRAY == redis_reply->type);
const auto array_size = static_cast<size_t>(redis_reply->elements);
Expand Down Expand Up @@ -152,72 +172,55 @@ const std::vector<std::optional<std::string>> &CallbackReply::ReadAsStringArray(
return string_array_reply_;
}

RedisRequestContext::RedisRequestContext(instrumented_io_context &io_service,
RedisCallback callback,
RedisAsyncContext *context,
std::vector<std::string> args)
: exp_back_off_(RayConfig::instance().redis_retry_base_ms(),
RayConfig::instance().redis_retry_multiplier(),
RayConfig::instance().redis_retry_max_ms()),
io_service_(io_service),
redis_context_(context),
pending_retries_(RayConfig::instance().num_redis_request_retries() + 1),
callback_(std::move(callback)),
start_time_(absl::Now()),
redis_cmds_(std::move(args)) {
for (size_t i = 0; i < redis_cmds_.size(); ++i) {
argv_.push_back(redis_cmds_[i].data());
argc_.push_back(redis_cmds_[i].size());
// This is a global redis callback which will be registered for every
// asynchronous redis call. It dispatches the appropriate callback
// that was registered with the RedisCallbackManager.
void GlobalRedisCallback(void *c, void *r, void *privdata) {
if (r == nullptr) {
return;
}
int64_t callback_index = reinterpret_cast<int64_t>(privdata);
redisReply *reply = reinterpret_cast<redisReply *>(r);
ProcessCallback(callback_index, std::make_shared<CallbackReply>(reply));
}

int64_t RedisCallbackManager::AllocateCallbackIndex() {
std::lock_guard<std::mutex> lock(mutex_);
return num_callbacks_++;
}

void RedisRequestContext::Run() {
if (pending_retries_ == 0) {
RAY_LOG(FATAL) << "Failed to run redis cmds: [" << absl::StrJoin(redis_cmds_, " ")
<< "] for " << RayConfig::instance().num_redis_request_retries()
<< " times.";
}

--pending_retries_;

auto fn =
+[](struct redisAsyncContext *async_context, void *raw_reply, void *privdata) {
auto *request_cxt = (RedisRequestContext *)privdata;
auto redis_reply = reinterpret_cast<redisReply *>(raw_reply);
// Error happened.
if (redis_reply == nullptr || redis_reply->type == REDIS_REPLY_ERROR) {
auto error_msg = redis_reply ? redis_reply->str : async_context->errstr;
RAY_LOG(ERROR) << "Redis request ["
<< absl::StrJoin(request_cxt->redis_cmds_, " ") << "]"
<< " failed due to error " << error_msg << ". "
<< request_cxt->pending_retries_ << " retries left.";
auto delay = request_cxt->exp_back_off_.Current();
request_cxt->exp_back_off_.Next();
// Retry the request after a while.
execute_after(
request_cxt->io_service_,
[request_cxt]() { request_cxt->Run(); },
std::chrono::milliseconds(delay));
} else {
auto reply = std::make_shared<CallbackReply>(redis_reply);
request_cxt->io_service_.post(
[reply, callback = std::move(request_cxt->callback_)]() {
callback(std::move(reply));
},
"RedisRequestContext.Callback");
auto end_time = absl::Now();
ray::stats::GcsLatency().Record((end_time - request_cxt->start_time_) /
absl::Milliseconds(1));
delete request_cxt;
}
};

Status status = redis_context_->RedisAsyncCommandArgv(
fn, this, argv_.size(), argv_.data(), argc_.data());

if (!status.ok()) {
fn(redis_context_->GetRawRedisAsyncContext(), nullptr, this);
int64_t RedisCallbackManager::AddCallback(const RedisCallback &function,
instrumented_io_context &io_service,
int64_t callback_index) {
auto start_time = absl::GetCurrentTimeNanos() / 1000;

std::lock_guard<std::mutex> lock(mutex_);
if (callback_index == -1) {
// No callback index was specified. Allocate a new callback index.
callback_index = num_callbacks_;
num_callbacks_++;
}
callback_items_.emplace(
callback_index, std::make_shared<CallbackItem>(function, start_time, io_service));
return callback_index;
}

std::shared_ptr<RedisCallbackManager::CallbackItem> RedisCallbackManager::GetCallback(
int64_t callback_index) const {
std::lock_guard<std::mutex> lock(mutex_);
auto it = callback_items_.find(callback_index);
RAY_CHECK(it != callback_items_.end()) << callback_index;
return it->second;
}

void RedisCallbackManager::Clear() {
std::lock_guard<std::mutex> lock(mutex_);
callback_items_.clear();
}

void RedisCallbackManager::RemoveCallback(int64_t callback_index) {
std::lock_guard<std::mutex> lock(mutex_);
callback_items_.erase(callback_index);
}

#define REDIS_CHECK_ERROR(CONTEXT, REPLY) \
Expand Down Expand Up @@ -547,14 +550,26 @@ std::unique_ptr<CallbackReply> RedisContext::RunArgvSync(
return callback_reply;
}

void RedisContext::RunArgvAsync(std::vector<std::string> args,
RedisCallback redis_callback) {
Status RedisContext::RunArgvAsync(const std::vector<std::string> &args,
const RedisCallback &redis_callback) {
RAY_CHECK(redis_async_context_);
auto request_context = new RedisRequestContext(io_service_,
std::move(redis_callback),
redis_async_context_.get(),
std::move(args));
request_context->Run();
// Build the arguments.
std::vector<const char *> argv;
std::vector<size_t> argc;
for (size_t i = 0; i < args.size(); ++i) {
argv.push_back(args[i].data());
argc.push_back(args[i].size());
}
int64_t callback_index =
RedisCallbackManager::instance().AddCallback(redis_callback, io_service_);
// Run the Redis command.
Status status = redis_async_context_->RedisAsyncCommandArgv(
reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
reinterpret_cast<void *>(callback_index),
args.size(),
argv.data(),
argc.data());
return status;
}

void RedisContext::FreeRedisReply(void *reply) { return freeReplyObject(reply); }
Expand Down
Loading

0 comments on commit 50db62d

Please sign in to comment.