From 682e9256ac31922da5e3bfb6d3775f43ea747bf6 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 17 May 2023 17:10:01 -0700 Subject: [PATCH] [core] Retry failed redis request (#35249) ## Why are these changes needed? After https://github.com/ray-project/ray/pull/35123 the request in ray is serialized and we should be able to retry the failed redis request in ray. This PR refactor the RedisContext a little bit by remove the CallbackItem and introduce the RequestContext. In side the RequestContext, the failed request will be retried automatically. If still failed in the end, it'll just crash. ## Related issue number https://github.com/ray-project/ray/issues/34014 --- BUILD.bazel | 25 +++ src/ray/common/ray_config_def.h | 9 + src/ray/common/test_util.cc | 31 +++- src/ray/common/test_util.h | 12 +- .../gcs_server/gcs_redis_failure_detector.cc | 8 +- .../gcs_server/test/gcs_server_rpc_test.cc | 1 - src/ray/gcs/redis_context.cc | 163 ++++++++---------- src/ray/gcs/redis_context.h | 76 +++----- .../gcs/store_client/redis_store_client.cc | 37 ++-- .../test/redis_store_client_test.cc | 39 ++++- .../test/store_client_test_base.h | 25 ++- 11 files changed, 225 insertions(+), 201 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 4212487d267b..66912546b4ec 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2619,6 +2619,31 @@ cc_test( ], ) +cc_test( + name = "chaos_redis_store_client_test", + size = "small", + srcs = ["src/ray/gcs/store_client/test/redis_store_client_test.cc"], + args = [ + "$(location redis-server)", + "$(location redis-cli)", + ], + copts = COPTS, + data = [ + "//:redis-cli", + "//:redis-server", + ], + env = {"REDIS_CHAOS": "1"}, + tags = ["team:core"], + target_compatible_with = [ + "@platforms//os:linux", + ], + deps = [ + ":redis_store_client", + ":store_client_test_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "in_memory_store_client_test", size = "small", diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index affb9ec7c9a5..0d4806ec45fe 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -311,6 +311,15 @@ RAY_CONFIG(int, worker_niceness, 15) RAY_CONFIG(int64_t, redis_db_connect_retries, 600) RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100) +/// Number of retries for a redis request failure. +RAY_CONFIG(size_t, num_redis_request_retries, 5) + +/// Exponential backoff setup. By default: +/// 100ms, 200ms, 400ms, 800ms, 1s, 1s,... +RAY_CONFIG(int64_t, redis_retry_base_ms, 100) +RAY_CONFIG(int64_t, redis_retry_multiplier, 2) +RAY_CONFIG(int64_t, redis_retry_max_ms, 1000) + /// The object manager's global timer interval in milliseconds. RAY_CONFIG(int, object_manager_timer_freq_ms, 100) diff --git a/src/ray/common/test_util.cc b/src/ray/common/test_util.cc index 853c8102694e..e4fad8e61765 100644 --- a/src/ray/common/test_util.cc +++ b/src/ray/common/test_util.cc @@ -30,18 +30,19 @@ namespace ray { -void TestSetupUtil::StartUpRedisServers(const std::vector &redis_server_ports) { +void TestSetupUtil::StartUpRedisServers(const std::vector &redis_server_ports, + bool save) { if (redis_server_ports.empty()) { - TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(0)); + TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(0, save)); } else { for (const auto &port : redis_server_ports) { - TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(port)); + TEST_REDIS_SERVER_PORTS.push_back(StartUpRedisServer(port, save)); } } } // start a redis server with specified port, use random one when 0 given -int TestSetupUtil::StartUpRedisServer(const int &port) { +int TestSetupUtil::StartUpRedisServer(int port, bool save) { int actual_port = port; if (port == 0) { static std::atomic srand_called(false); @@ -58,8 +59,12 @@ int TestSetupUtil::StartUpRedisServer(const int &port) { #ifdef _WIN32 std::vector cmdargs({program, "--loglevel", "warning"}); #else - std::vector cmdargs( - {program, "--loglevel", "warning", "--save", "", "--appendonly", "no"}); + std::vector cmdargs; + if (!save) { + cmdargs = {program, "--loglevel", "warning", "--save", "", "--appendonly", "no"}; + } else { + cmdargs = {program, "--loglevel", "warning"}; + } #endif cmdargs.insert(cmdargs.end(), {"--port", std::to_string(actual_port)}); RAY_LOG(INFO) << "Start redis command is: " << CreateCommandLine(cmdargs); @@ -75,7 +80,7 @@ void TestSetupUtil::ShutDownRedisServers() { TEST_REDIS_SERVER_PORTS = std::vector(); } -void TestSetupUtil::ShutDownRedisServer(const int &port) { +void TestSetupUtil::ShutDownRedisServer(int port) { std::vector cmdargs( {TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port), "shutdown"}); RAY_LOG(INFO) << "Stop redis command is: " << CreateCommandLine(cmdargs); @@ -91,7 +96,17 @@ void TestSetupUtil::FlushAllRedisServers() { } } -void TestSetupUtil::FlushRedisServer(const int &port) { +void TestSetupUtil::ExecuteRedisCmd(int port, std::vector cmd) { + std::vector cmdargs( + {TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port)}); + cmdargs.insert(cmdargs.end(), cmd.begin(), cmd.end()); + RAY_LOG(INFO) << "Send command to redis: " << CreateCommandLine(cmdargs); + if (Process::Call(cmdargs)) { + RAY_LOG(WARNING) << "Failed to send request to redis."; + } +} + +void TestSetupUtil::FlushRedisServer(int port) { std::vector cmdargs( {TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port), "flushall"}); RAY_LOG(INFO) << "Cleaning up redis with command: " << CreateCommandLine(cmdargs); diff --git a/src/ray/common/test_util.h b/src/ray/common/test_util.h index c87e260960b5..d91fb62b10f5 100644 --- a/src/ray/common/test_util.h +++ b/src/ray/common/test_util.h @@ -112,7 +112,8 @@ extern std::string TEST_MOCK_WORKER_EXEC_PATH; /// 5. start/stop raylet monitor class TestSetupUtil { public: - static void StartUpRedisServers(const std::vector &redis_server_ports); + static void StartUpRedisServers(const std::vector &redis_server_ports, + bool save = false); static void ShutDownRedisServers(); static void FlushAllRedisServers(); @@ -124,11 +125,10 @@ class TestSetupUtil { const std::string &resource, std::string *store_socket_name); static void StopRaylet(const std::string &raylet_socket_name); - - private: - static int StartUpRedisServer(const int &port); - static void ShutDownRedisServer(const int &port); - static void FlushRedisServer(const int &port); + static void ExecuteRedisCmd(int port, std::vector cmd); + static int StartUpRedisServer(int port, bool save = false); + static void ShutDownRedisServer(int port); + static void FlushRedisServer(int port); }; } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc index 6cd8fce9a466..54f705aaa7a2 100644 --- a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc +++ b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc @@ -48,13 +48,7 @@ void GcsRedisFailureDetector::DetectRedis() { callback_(); } }; - - Status status = redis_context_->RunArgvAsync({"PING"}, redis_callback); - - if (!status.ok()) { - RAY_LOG(ERROR) << "Redis is disconnected."; - callback_(); - } + redis_context_->RunArgvAsync({"PING"}, redis_callback); } } // namespace gcs diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index cf5078762e1f..ebcfcb218d49 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -63,7 +63,6 @@ class GcsServerTest : public ::testing::Test { gcs_server_->Stop(); thread_io_service_->join(); gcs_server_.reset(); - ray::gcs::RedisCallbackManager::instance().Clear(); rpc::ResetServerCallExecutor(); } diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 361d40021744..f7e2630ddb60 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -16,6 +16,7 @@ #include +#include "ray/common/asio/asio_util.h" #include "ray/stats/metric_defs.h" #include "ray/util/util.h" @@ -26,33 +27,10 @@ extern "C" { } // TODO(pcm): Integrate into the C++ tree. +#include "absl/strings/str_join.h" #include "absl/strings/str_split.h" #include "ray/common/ray_config.h" -namespace { - -/// A helper function to call the callback and delete it from the callback -/// manager if necessary. -void ProcessCallback(int64_t callback_index, - std::shared_ptr callback_reply) { - RAY_CHECK(callback_index >= 0) << "The callback index must be greater than 0, " - << "but it actually is " << callback_index; - auto callback_item = - ray::gcs::RedisCallbackManager::instance().GetCallback(callback_index); - - // Record the redis latency - auto end_time = absl::GetCurrentTimeNanos() / 1000; - ray::stats::GcsLatency().Record(end_time - callback_item->start_time_); - - // Dispatch the callback. - callback_item->Dispatch(callback_reply); - - // Delete the callback - ray::gcs::RedisCallbackManager::instance().RemoveCallback(callback_index); -} - -} // namespace - namespace ray { namespace gcs { @@ -65,7 +43,7 @@ CallbackReply::CallbackReply(redisReply *redis_reply) : reply_type_(redis_reply- break; } case REDIS_REPLY_ERROR: { - RAY_CHECK(false) << "Got an error in redis reply: " << redis_reply->str; + RAY_LOG(FATAL) << "Got an error in redis reply: " << redis_reply->str; break; } case REDIS_REPLY_INTEGER: { @@ -99,6 +77,8 @@ CallbackReply::CallbackReply(redisReply *redis_reply) : reply_type_(redis_reply- } } +bool CallbackReply::IsError() const { return reply_type_ == REDIS_REPLY_ERROR; } + void CallbackReply::ParseAsStringArrayOrScanArray(redisReply *redis_reply) { RAY_CHECK(REDIS_REPLY_ARRAY == redis_reply->type); const auto array_size = static_cast(redis_reply->elements); @@ -172,55 +152,72 @@ const std::vector> &CallbackReply::ReadAsStringArray( return string_array_reply_; } -// This is a global redis callback which will be registered for every -// asynchronous redis call. It dispatches the appropriate callback -// that was registered with the RedisCallbackManager. -void GlobalRedisCallback(void *c, void *r, void *privdata) { - if (r == nullptr) { - return; +RedisRequestContext::RedisRequestContext(instrumented_io_context &io_service, + RedisCallback callback, + RedisAsyncContext *context, + std::vector args) + : exp_back_off_(RayConfig::instance().redis_retry_base_ms(), + RayConfig::instance().redis_retry_multiplier(), + RayConfig::instance().redis_retry_max_ms()), + io_service_(io_service), + redis_context_(context), + pending_retries_(RayConfig::instance().num_redis_request_retries() + 1), + callback_(std::move(callback)), + start_time_(absl::Now()), + redis_cmds_(std::move(args)) { + for (size_t i = 0; i < redis_cmds_.size(); ++i) { + argv_.push_back(redis_cmds_[i].data()); + argc_.push_back(redis_cmds_[i].size()); } - int64_t callback_index = reinterpret_cast(privdata); - redisReply *reply = reinterpret_cast(r); - ProcessCallback(callback_index, std::make_shared(reply)); -} - -int64_t RedisCallbackManager::AllocateCallbackIndex() { - std::lock_guard lock(mutex_); - return num_callbacks_++; } -int64_t RedisCallbackManager::AddCallback(const RedisCallback &function, - instrumented_io_context &io_service, - int64_t callback_index) { - auto start_time = absl::GetCurrentTimeNanos() / 1000; - - std::lock_guard lock(mutex_); - if (callback_index == -1) { - // No callback index was specified. Allocate a new callback index. - callback_index = num_callbacks_; - num_callbacks_++; +void RedisRequestContext::Run() { + if (pending_retries_ == 0) { + RAY_LOG(FATAL) << "Failed to run redis cmds: [" << absl::StrJoin(redis_cmds_, " ") + << "] for " << RayConfig::instance().num_redis_request_retries() + << " times."; + } + + --pending_retries_; + + auto fn = + +[](struct redisAsyncContext *async_context, void *raw_reply, void *privdata) { + auto *request_cxt = (RedisRequestContext *)privdata; + auto redis_reply = reinterpret_cast(raw_reply); + // Error happened. + if (redis_reply == nullptr || redis_reply->type == REDIS_REPLY_ERROR) { + auto error_msg = redis_reply ? redis_reply->str : async_context->errstr; + RAY_LOG(ERROR) << "Redis request [" + << absl::StrJoin(request_cxt->redis_cmds_, " ") << "]" + << " failed due to error " << error_msg << ". " + << request_cxt->pending_retries_ << " retries left."; + auto delay = request_cxt->exp_back_off_.Current(); + request_cxt->exp_back_off_.Next(); + // Retry the request after a while. + execute_after( + request_cxt->io_service_, + [request_cxt]() { request_cxt->Run(); }, + std::chrono::milliseconds(delay)); + } else { + auto reply = std::make_shared(redis_reply); + request_cxt->io_service_.post( + [reply, callback = std::move(request_cxt->callback_)]() { + callback(std::move(reply)); + }, + "RedisRequestContext.Callback"); + auto end_time = absl::Now(); + ray::stats::GcsLatency().Record((end_time - request_cxt->start_time_) / + absl::Milliseconds(1)); + delete request_cxt; + } + }; + + Status status = redis_context_->RedisAsyncCommandArgv( + fn, this, argv_.size(), argv_.data(), argc_.data()); + + if (!status.ok()) { + fn(redis_context_->GetRawRedisAsyncContext(), nullptr, this); } - callback_items_.emplace( - callback_index, std::make_shared(function, start_time, io_service)); - return callback_index; -} - -std::shared_ptr RedisCallbackManager::GetCallback( - int64_t callback_index) const { - std::lock_guard lock(mutex_); - auto it = callback_items_.find(callback_index); - RAY_CHECK(it != callback_items_.end()) << callback_index; - return it->second; -} - -void RedisCallbackManager::Clear() { - std::lock_guard lock(mutex_); - callback_items_.clear(); -} - -void RedisCallbackManager::RemoveCallback(int64_t callback_index) { - std::lock_guard lock(mutex_); - callback_items_.erase(callback_index); } #define REDIS_CHECK_ERROR(CONTEXT, REPLY) \ @@ -550,26 +547,14 @@ std::unique_ptr RedisContext::RunArgvSync( return callback_reply; } -Status RedisContext::RunArgvAsync(const std::vector &args, - const RedisCallback &redis_callback) { +void RedisContext::RunArgvAsync(std::vector args, + RedisCallback redis_callback) { RAY_CHECK(redis_async_context_); - // Build the arguments. - std::vector argv; - std::vector argc; - for (size_t i = 0; i < args.size(); ++i) { - argv.push_back(args[i].data()); - argc.push_back(args[i].size()); - } - int64_t callback_index = - RedisCallbackManager::instance().AddCallback(redis_callback, io_service_); - // Run the Redis command. - Status status = redis_async_context_->RedisAsyncCommandArgv( - reinterpret_cast(&GlobalRedisCallback), - reinterpret_cast(callback_index), - args.size(), - argv.data(), - argc.data()); - return status; + auto request_context = new RedisRequestContext(io_service_, + std::move(redis_callback), + redis_async_context_.get(), + std::move(args)); + request_context->Run(); } void RedisContext::FreeRedisReply(void *reply) { return freeReplyObject(reply); } diff --git a/src/ray/gcs/redis_context.h b/src/ray/gcs/redis_context.h index 0287d1ef7b79..c082646a88f8 100644 --- a/src/ray/gcs/redis_context.h +++ b/src/ray/gcs/redis_context.h @@ -46,6 +46,9 @@ class CallbackReply { /// Whether this reply is `nil` type reply. bool IsNil() const; + /// Whether an error happened; + bool IsError() const; + /// Read this reply data as an integer. int64_t ReadAsInteger() const; @@ -104,62 +107,25 @@ class CallbackReply { /// operation. using RedisCallback = std::function)>; -void GlobalRedisCallback(void *c, void *r, void *privdata); - -class RedisCallbackManager { - public: - static RedisCallbackManager &instance() { - static RedisCallbackManager instance; - return instance; - } - - struct CallbackItem : public std::enable_shared_from_this { - CallbackItem() = default; - - CallbackItem(const RedisCallback &callback, - int64_t start_time, - instrumented_io_context &io_service) - : callback_(callback), start_time_(start_time), io_service_(&io_service) {} - - void Dispatch(std::shared_ptr &reply) { - std::shared_ptr self = shared_from_this(); - if (callback_ != nullptr) { - io_service_->post([self, reply]() { self->callback_(std::move(reply)); }, - "RedisCallbackManager.DispatchCallback"); - } - } - - RedisCallback callback_; - int64_t start_time_; - instrumented_io_context *io_service_; - }; +struct RedisRequestContext { + RedisRequestContext(instrumented_io_context &io_service, + RedisCallback callback, + RedisAsyncContext *context, + std::vector args); - /// Allocate an index at which we can add a callback later on. - int64_t AllocateCallbackIndex(); - - /// Add a callback at an optionally specified index. - int64_t AddCallback(const RedisCallback &function, - instrumented_io_context &io_service, - int64_t callback_index = -1); - - /// Remove a callback. - void RemoveCallback(int64_t callback_index); - - /// Get a callback. - std::shared_ptr GetCallback(int64_t callback_index) const; - - /// Clear all callbacks. - void Clear(); + void Run(); private: - RedisCallbackManager() : num_callbacks_(0){}; - - ~RedisCallbackManager() {} - - mutable std::mutex mutex_; - - int64_t num_callbacks_ = 0; - absl::flat_hash_map> callback_items_; + ExponentialBackOff exp_back_off_; + instrumented_io_context &io_service_; + RedisAsyncContext *redis_context_; + size_t pending_retries_; + RedisCallback callback_; + absl::Time start_time_; + + std::vector redis_cmds_; + std::vector argv_; + std::vector argc_; }; class RedisContext { @@ -195,8 +161,8 @@ class RedisContext { /// \param args The vector of command args to pass to Redis. /// \param redis_callback The Redis callback function. /// \return Status. - Status RunArgvAsync(const std::vector &args, - const RedisCallback &redis_callback = nullptr); + void RunArgvAsync(std::vector args, + RedisCallback redis_callback = nullptr); redisContext *sync_context() { RAY_CHECK(context_); diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 1c1ad0bc27ff..a20aa744de96 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -301,22 +301,22 @@ void RedisStoreClient::SendRedisCmd(std::vector keys, } // Send the actual request auto cxt = redis_client_->GetShardContext(""); - RAY_CHECK_OK(cxt->RunArgvAsync( - std::move(args), - [this, keys = std::move(keys), redis_callback = std::move(redis_callback)]( - auto reply) { - std::vector> requests; - { - absl::MutexLock lock(&mu_); - requests = TakeRequestsFromSendingQueue(keys); - } - for (auto &request : requests) { - request(); - } - if (redis_callback) { - redis_callback(reply); - } - })); + cxt->RunArgvAsync(std::move(args), + [this, + keys = std::move(keys), + redis_callback = std::move(redis_callback)](auto reply) { + std::vector> requests; + { + absl::MutexLock lock(&mu_); + requests = TakeRequestsFromSendingQueue(keys); + } + for (auto &request : requests) { + request(); + } + if (redis_callback) { + redis_callback(reply); + } + }); }; { @@ -432,10 +432,7 @@ void RedisStoreClient::RedisScanner::Scan(const std::string &match_pattern, "COUNT", std::to_string(batch_count)}; auto shard_context = redis_client_->GetShardContexts()[shard_index]; - Status status = shard_context->RunArgvAsync(args, scan_callback); - if (!status.ok()) { - RAY_LOG(FATAL) << "Scan failed, status " << status.ToString(); - } + shard_context->RunArgvAsync(args, scan_callback); } } diff --git a/src/ray/gcs/store_client/test/redis_store_client_test.cc b/src/ray/gcs/store_client/test/redis_store_client_test.cc index 6041512916de..af88a26f628f 100644 --- a/src/ray/gcs/store_client/test/redis_store_client_test.cc +++ b/src/ray/gcs/store_client/test/redis_store_client_test.cc @@ -14,17 +14,26 @@ #include "ray/gcs/store_client/redis_store_client.h" +#include + #include "ray/common/test_util.h" #include "ray/gcs/redis_client.h" #include "ray/gcs/store_client/test/store_client_test_base.h" +using namespace std::chrono_literals; namespace ray { namespace gcs { class RedisStoreClientTest : public StoreClientTestBase { public: - RedisStoreClientTest() {} + RedisStoreClientTest() { + if (std::getenv("REDIS_CHAOS") != nullptr) { + ::RayConfig::instance().num_redis_request_retries() = 1000; + ::RayConfig::instance().redis_retry_base_ms() = 10; + ::RayConfig::instance().redis_retry_max_ms() = 100; + } + } virtual ~RedisStoreClientTest() {} @@ -32,6 +41,30 @@ class RedisStoreClientTest : public StoreClientTestBase { static void TearDownTestCase() { TestSetupUtil::ShutDownRedisServers(); } + void SetUp() override { + auto port = TEST_REDIS_SERVER_PORTS.front(); + TestSetupUtil::FlushRedisServer(port); + StoreClientTestBase::SetUp(); + if (std::getenv("REDIS_CHAOS") != nullptr) { + t_ = std::make_unique([this, port]() { + while (!stopped_) { + TestSetupUtil::ExecuteRedisCmd(port, {"REPLICAOF", "localhost", "1234"}); + std::this_thread::sleep_for(50ms); + TestSetupUtil::ExecuteRedisCmd(port, {"REPLICAOF", "NO", "ONE"}); + std::this_thread::sleep_for(200ms); + } + }); + } + } + + void TearDown() override { + stopped_ = true; + if (t_) { + t_->join(); + } + StoreClientTestBase::TearDown(); + } + void InitStoreClient() override { RedisClientOptions options("127.0.0.1", TEST_REDIS_SERVER_PORTS.front(), @@ -47,6 +80,8 @@ class RedisStoreClientTest : public StoreClientTestBase { protected: std::shared_ptr redis_client_; + std::unique_ptr t_; + std::atomic stopped_ = false; }; TEST_F(RedisStoreClientTest, AsyncPutAndAsyncGetTest) { TestAsyncPutAndAsyncGet(); } @@ -325,7 +360,7 @@ TEST_F(RedisStoreClientTest, Random) { auto idx = std::rand() % ops.size(); ops[idx](i); } - EXPECT_TRUE(WaitForCondition([&counter]() { return *counter == 0; }, 5000)); + EXPECT_TRUE(WaitForCondition([&counter]() { return *counter == 0; }, 10000)); auto redis_store_client_raw_ptr = (RedisStoreClient *)store_client_.get(); absl::MutexLock lock(&redis_store_client_raw_ptr->mu_); ASSERT_TRUE(redis_store_client_raw_ptr->pending_redis_request_by_key_.empty()); diff --git a/src/ray/gcs/store_client/test/store_client_test_base.h b/src/ray/gcs/store_client/test/store_client_test_base.h index ce974cc66d62..ebdee3c683d6 100644 --- a/src/ray/gcs/store_client/test/store_client_test_base.h +++ b/src/ray/gcs/store_client/test/store_client_test_base.h @@ -65,10 +65,10 @@ class StoreClientTestBase : public ::testing::Test { for (const auto &[key, value] : key_to_value_) { ++pending_count_; RAY_CHECK_OK(store_client_->AsyncPut( - table_name_, key.Binary(), value.SerializeAsString(), true, put_calllback)); + table_name_, key.Hex(), value.SerializeAsString(), true, put_calllback)); // Make sure no-op callback is handled well RAY_CHECK_OK(store_client_->AsyncPut( - table_name_, key.Binary(), value.SerializeAsString(), true, nullptr)); + table_name_, key.Hex(), value.SerializeAsString(), true, nullptr)); } WaitPendingDone(); } @@ -77,10 +77,9 @@ class StoreClientTestBase : public ::testing::Test { auto delete_calllback = [this](auto) { --pending_count_; }; for (const auto &[key, _] : key_to_value_) { ++pending_count_; - RAY_CHECK_OK( - store_client_->AsyncDelete(table_name_, key.Binary(), delete_calllback)); + RAY_CHECK_OK(store_client_->AsyncDelete(table_name_, key.Hex(), delete_calllback)); // Make sure no-op callback is handled well - RAY_CHECK_OK(store_client_->AsyncDelete(table_name_, key.Binary(), nullptr)); + RAY_CHECK_OK(store_client_->AsyncDelete(table_name_, key.Hex(), nullptr)); } WaitPendingDone(); } @@ -99,14 +98,14 @@ class StoreClientTestBase : public ::testing::Test { }; for (const auto &[key, _] : key_to_value_) { ++pending_count_; - RAY_CHECK_OK(store_client_->AsyncGet(table_name_, key.Binary(), get_callback)); + RAY_CHECK_OK(store_client_->AsyncGet(table_name_, key.Hex(), get_callback)); } WaitPendingDone(); } void GetEmpty() { for (const auto &[k, _] : key_to_value_) { - auto key = k.Binary(); + auto key = k.Hex(); auto get_callback = [this, key](const Status &status, const boost::optional &result) { RAY_CHECK_OK(status); @@ -126,7 +125,7 @@ class StoreClientTestBase : public ::testing::Test { RAY_LOG(INFO) << "ReceivedKeys=" << result.size(); static std::unordered_set received_keys; for (const auto &item : result) { - const ActorID &actor_id = ActorID::FromBinary(item.first); + const ActorID &actor_id = ActorID::FromHex(item.first); auto it = received_keys.find(actor_id); RAY_CHECK(it == received_keys.end()); received_keys.emplace(actor_id); @@ -145,14 +144,14 @@ class StoreClientTestBase : public ::testing::Test { void GetKeys() { for (int i = 0; i < 100; i++) { - auto key = keys_.at(std::rand() % keys_.size()).Binary(); + auto key = keys_.at(std::rand() % keys_.size()).Hex(); auto prefix = key.substr(0, std::rand() % key.size()); RAY_LOG(INFO) << "key is: " << std::hex << key << ", prefix is: " << std::hex << prefix; std::unordered_set result_set; for (const auto &item1 : key_to_value_) { - if (item1.first.Binary().find(prefix) == 0) { - result_set.insert(item1.first.Binary()); + if (item1.first.Hex().find(prefix) == 0) { + result_set.insert(item1.first.Hex()); } } ASSERT_FALSE(result_set.empty()); @@ -179,7 +178,7 @@ class StoreClientTestBase : public ::testing::Test { pending_count_ += key_to_value_.size(); for (const auto &item : key_to_value_) { RAY_CHECK_OK( - store_client_->AsyncExists(table_name_, item.first.Binary(), exists_callback)); + store_client_->AsyncExists(table_name_, item.first.Hex(), exists_callback)); } WaitPendingDone(); } @@ -189,7 +188,7 @@ class StoreClientTestBase : public ::testing::Test { ++pending_count_; std::vector keys; for (auto &[key, _] : key_to_value_) { - keys.push_back(key.Binary()); + keys.push_back(key.Hex()); } RAY_CHECK_OK(store_client_->AsyncBatchDelete(table_name_, keys, delete_calllback)); // Make sure no-op callback is handled well