diff --git a/.bazelrc b/.bazelrc index 1c4b9f296b8f..1b98b8f72a3f 100644 --- a/.bazelrc +++ b/.bazelrc @@ -97,6 +97,7 @@ build:tsan --copt -g build:tsan --copt -fno-omit-frame-pointer build:tsan --copt -Wno-uninitialized build:tsan --linkopt -fsanitize=thread +build:tsan --cxxopt="-D_RAY_TSAN_BUILD" # This config is only for running TSAN with LLVM toolchain on Linux. build:tsan-clang --config=tsan build:tsan-clang --config=llvm diff --git a/BUILD.bazel b/BUILD.bazel index 10d3b0e621fc..77840d8252c6 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1844,7 +1844,7 @@ cc_library( cc_test( name = "gcs_health_check_manager_test", - size = "small", + size = "medium", srcs = [ "src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc", ], diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 831f345ef81a..7a3d99e74e05 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -53,6 +53,7 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { if (iter == health_check_contexts_.end()) { return; } + iter->second->Stop(); health_check_contexts_.erase(iter); }, "GcsHealthCheckManager::RemoveNode"); @@ -60,8 +61,11 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { void GcsHealthCheckManager::FailNode(const NodeID &node_id) { RAY_LOG(WARNING) << "Node " << node_id << " is dead because the health check failed."; - on_node_death_callback_(node_id); - health_check_contexts_.erase(node_id); + auto iter = health_check_contexts_.find(node_id); + if (iter != health_check_contexts_.end()) { + on_node_death_callback_(node_id); + health_check_contexts_.erase(iter); + } } std::vector GcsHealthCheckManager::GetAllNodes() const { @@ -75,27 +79,23 @@ std::vector GcsHealthCheckManager::GetAllNodes() const { void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { using ::grpc::health::v1::HealthCheckResponse; - context_ = std::make_shared(); + // Reset the context/request/response for the next request. + context_.~ClientContext(); + new (&context_) grpc::ClientContext(); + response_.Clear(); auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(manager_->timeout_ms_); - context_->set_deadline(deadline); + context_.set_deadline(deadline); stub_->async()->Check( - context_.get(), - &request_, - &response_, - [this, stopped = this->stopped_, context = this->context_, now = absl::Now()]( - ::grpc::Status status) { + &context_, &request_, &response_, [this, now = absl::Now()](::grpc::Status status) { // This callback is done in gRPC's thread pool. STATS_health_check_rpc_latency_ms.Record( absl::ToInt64Milliseconds(absl::Now() - now)); - if (status.error_code() == ::grpc::StatusCode::CANCELLED) { - return; - } manager_->io_service_.post( - [this, stopped, status]() { - // Stopped has to be read in the same thread where it's updated. - if (*stopped) { + [this, status]() { + if (stopped_) { + delete this; return; } RAY_LOG(DEBUG) << "Health check status: " << int(response_.status()); @@ -110,32 +110,28 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { } if (health_check_remaining_ == 0) { - manager_->io_service_.post([this]() { manager_->FailNode(node_id_); }, - ""); + manager_->FailNode(node_id_); + delete this; } else { // Do another health check. timer_.expires_from_now( boost::posix_time::milliseconds(manager_->period_ms_)); - timer_.async_wait([this, stopped](auto ec) { - // We need to check stopped here as well since cancel - // won't impact the queued tasks. - if (ec != boost::asio::error::operation_aborted && !*stopped) { - StartHealthCheck(); - } - }); + timer_.async_wait([this](auto) { StartHealthCheck(); }); } }, "HealthCheck"); }); } +void GcsHealthCheckManager::HealthCheckContext::Stop() { stopped_ = true; } + void GcsHealthCheckManager::AddNode(const NodeID &node_id, std::shared_ptr channel) { io_service_.dispatch( [this, channel, node_id]() { RAY_CHECK(health_check_contexts_.count(node_id) == 0); - auto context = std::make_unique(this, channel, node_id); - health_check_contexts_.emplace(std::make_pair(node_id, std::move(context))); + auto context = new HealthCheckContext(this, channel, node_id); + health_check_contexts_.emplace(std::make_pair(node_id, context)); }, "GcsHealthCheckManager::AddNode"); } diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index e0daea5edfcf..d877a217d803 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -91,27 +91,16 @@ class GcsHealthCheckManager { NodeID node_id) : manager_(manager), node_id_(node_id), - stopped_(std::make_shared(false)), timer_(manager->io_service_), health_check_remaining_(manager->failure_threshold_) { request_.set_service(node_id.Hex()); stub_ = grpc::health::v1::Health::NewStub(channel); timer_.expires_from_now( boost::posix_time::milliseconds(manager_->initial_delay_ms_)); - timer_.async_wait([this](auto ec) { - if (ec != boost::asio::error::operation_aborted) { - StartHealthCheck(); - } - }); + timer_.async_wait([this](auto) { StartHealthCheck(); }); } - ~HealthCheckContext() { - timer_.cancel(); - if (context_ != nullptr) { - context_->TryCancel(); - } - *stopped_ = true; - } + void Stop(); private: void StartHealthCheck(); @@ -121,14 +110,12 @@ class GcsHealthCheckManager { NodeID node_id_; // Whether the health check has stopped. - std::shared_ptr stopped_; + bool stopped_ = false; /// gRPC related fields std::unique_ptr<::grpc::health::v1::Health::Stub> stub_; - // The context is used in the gRPC callback which is in another - // thread, so we need it to be a shared_ptr. - std::shared_ptr context_; + grpc::ClientContext context_; ::grpc::health::v1::HealthCheckRequest request_; ::grpc::health::v1::HealthCheckResponse response_; @@ -146,7 +133,7 @@ class GcsHealthCheckManager { std::function on_node_death_callback_; /// The context of the health check for each nodes. - absl::flat_hash_map> health_check_contexts_; + absl::flat_hash_map health_check_contexts_; /// The delay for the first health check request. const int64_t initial_delay_ms_; diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index f4f4bf8cefb2..31be1bbd5d6d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -19,9 +19,13 @@ #include #include #include +#include #include using namespace boost; +using namespace boost::asio; +using namespace boost::asio::ip; + #include #include @@ -30,6 +34,20 @@ using namespace boost; #include "gtest/gtest.h" #include "ray/gcs/gcs_server/gcs_health_check_manager.h" +int GetFreePort() { + io_service io_service; + tcp::acceptor acceptor(io_service); + tcp::endpoint endpoint; + + // try to bind to port 0 to find a free port + acceptor.open(tcp::v4()); + acceptor.bind(tcp::endpoint(tcp::v4(), 0)); + endpoint = acceptor.local_endpoint(); + auto port = endpoint.port(); + acceptor.close(); + return port; +} + using namespace ray; using namespace std::literals::chrono_literals; @@ -46,7 +64,6 @@ class GcsHealthCheckManagerTest : public ::testing::Test { timeout_ms, period_ms, failure_threshold); - port = 10000; } void TearDown() override { @@ -65,7 +82,8 @@ class GcsHealthCheckManagerTest : public ::testing::Test { NodeID AddServer(bool alive = true) { std::promise port_promise; auto node_id = NodeID::FromRandom(); - + auto port = GetFreePort(); + RAY_LOG(INFO) << "Get port " << port; auto server = std::make_shared(node_id.Hex(), port, true); auto channel = grpc::CreateChannel("localhost:" + std::to_string(port), @@ -76,7 +94,6 @@ class GcsHealthCheckManagerTest : public ::testing::Test { } servers.emplace(node_id, server); health_check->AddNode(node_id, channel); - ++port; return node_id; } @@ -115,14 +132,13 @@ class GcsHealthCheckManagerTest : public ::testing::Test { } } - int port; instrumented_io_context io_service; std::unique_ptr health_check; std::unordered_map> servers; std::unordered_set dead_nodes; - const int64_t initial_delay_ms = 1000; - const int64_t timeout_ms = 1000; - const int64_t period_ms = 1000; + const int64_t initial_delay_ms = 100; + const int64_t timeout_ms = 10; + const int64_t period_ms = 10; const int64_t failure_threshold = 5; }; @@ -143,8 +159,6 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) { Run(2); // One for starting RPC and one for the RPC callback. } - Run(); // For failure callback. - ASSERT_EQ(1, dead_nodes.size()); ASSERT_TRUE(dead_nodes.count(node_id)); } @@ -169,8 +183,6 @@ TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { } } - Run(); // For failure callback. - ASSERT_EQ(0, dead_nodes.size()); } @@ -196,8 +208,6 @@ TEST_F(GcsHealthCheckManagerTest, Crashed) { Run(2); // One for starting RPC and one for the RPC callback. } - Run(); // For failure callback. - ASSERT_EQ(1, dead_nodes.size()); ASSERT_TRUE(dead_nodes.count(node_id)); } @@ -230,12 +240,49 @@ TEST_F(GcsHealthCheckManagerTest, NoRegister) { Run(2); // One for starting RPC and one for the RPC callback. } - Run(2); + Run(1); ASSERT_EQ(1, dead_nodes.size()); ASSERT_TRUE(dead_nodes.count(node_id)); } +TEST_F(GcsHealthCheckManagerTest, StressTest) { +#ifdef _RAY_TSAN_BUILD + GTEST_SKIP() << "Disabled in tsan because of performance"; +#endif + boost::asio::io_service::work work(io_service); + std::srand(std::time(nullptr)); + auto t = std::make_unique([this]() { this->io_service.run(); }); + + std::vector alive_nodes; + + for (int i = 0; i < 200; ++i) { + alive_nodes.emplace_back(AddServer(true)); + std::this_thread::sleep_for(10ms); + } + + for (size_t i = 0; i < 20000UL; ++i) { + RAY_LOG(INFO) << "Progress: " << i << "/20000"; + auto iter = alive_nodes.begin() + std::rand() % alive_nodes.size(); + health_check->RemoveNode(*iter); + DeleteServer(*iter); + alive_nodes.erase(iter); + alive_nodes.emplace_back(AddServer(true)); + } + RAY_LOG(INFO) << "Finished!"; + io_service.stop(); + t->join(); +} + int main(int argc, char **argv) { + InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog, + ray::RayLog::ShutDownRayLog, + argv[0], + ray::RayLogLevel::INFO, + /*log_dir=*/""); + + ray::RayLog::InstallFailureSignalHandler(argv[0]); + ray::RayLog::InstallTerminateHandler(); + ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }