From d360389bd5101c3c950e6118d63d786005c08493 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 20 Jan 2023 21:49:39 +0000 Subject: [PATCH 01/14] potential fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/gcs_health_check_manager.cc | 2 +- src/ray/gcs/gcs_server/gcs_health_check_manager.h | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 831f345ef81a..419a4f49675a 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -84,7 +84,7 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { context_.get(), &request_, &response_, - [this, stopped = this->stopped_, context = this->context_, now = absl::Now()]( + [this, stub = stub_, stopped = this->stopped_, context = this->context_, now = absl::Now()]( ::grpc::Status status) { // This callback is done in gRPC's thread pool. STATS_health_check_rpc_latency_ms.Record( diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index e0daea5edfcf..21c53f154d21 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -98,8 +98,8 @@ class GcsHealthCheckManager { stub_ = grpc::health::v1::Health::NewStub(channel); timer_.expires_from_now( boost::posix_time::milliseconds(manager_->initial_delay_ms_)); - timer_.async_wait([this](auto ec) { - if (ec != boost::asio::error::operation_aborted) { + timer_.async_wait([this, stopped = stopped_](auto ec) { + if (!*stopped && ec != boost::asio::error::operation_aborted) { StartHealthCheck(); } }); @@ -124,7 +124,7 @@ class GcsHealthCheckManager { std::shared_ptr stopped_; /// gRPC related fields - std::unique_ptr<::grpc::health::v1::Health::Stub> stub_; + std::shared_ptr<::grpc::health::v1::Health::Stub> stub_; // The context is used in the gRPC callback which is in another // thread, so we need it to be a shared_ptr. From 71b91f3ad15864e4f0b8f7166a03e20f2abfd11a Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 20 Jan 2023 21:49:53 +0000 Subject: [PATCH 02/14] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/gcs_health_check_manager.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 419a4f49675a..260cd530fa7c 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -84,8 +84,11 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { context_.get(), &request_, &response_, - [this, stub = stub_, stopped = this->stopped_, context = this->context_, now = absl::Now()]( - ::grpc::Status status) { + [this, + stub = stub_, + stopped = this->stopped_, + context = this->context_, + now = absl::Now()](::grpc::Status status) { // This callback is done in gRPC's thread pool. STATS_health_check_rpc_latency_ms.Record( absl::ToInt64Milliseconds(absl::Now() - now)); From 407a8f0830629d8f0c3b15046f058914b052485c Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 20 Jan 2023 23:18:34 +0000 Subject: [PATCH 03/14] checking Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs/gcs_server/gcs_health_check_manager.h | 2 +- .../test/gcs_health_check_manager_test.cc | 64 +++++++++++++++++-- 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index 21c53f154d21..b475c4279f04 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -98,7 +98,7 @@ class GcsHealthCheckManager { stub_ = grpc::health::v1::Health::NewStub(channel); timer_.expires_from_now( boost::posix_time::milliseconds(manager_->initial_delay_ms_)); - timer_.async_wait([this, stopped = stopped_](auto ec) { + timer_.async_wait([stopped = stopped_, this](auto ec) { if (!*stopped && ec != boost::asio::error::operation_aborted) { StartHealthCheck(); } diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index f4f4bf8cefb2..07d9c749f4f8 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -22,6 +23,9 @@ #include using namespace boost; +using namespace boost::asio; +using namespace boost::asio::ip; + #include #include @@ -30,6 +34,20 @@ using namespace boost; #include "gtest/gtest.h" #include "ray/gcs/gcs_server/gcs_health_check_manager.h" +int GetFreePort() { + io_service io_service; + tcp::acceptor acceptor(io_service); + tcp::endpoint endpoint; + + // try to bind to port 0 to find a free port + acceptor.open(tcp::v4()); + acceptor.bind(tcp::endpoint(tcp::v4(), 0)); + endpoint = acceptor.local_endpoint(); + auto port = endpoint.port(); + acceptor.close(); + return port; +} + using namespace ray; using namespace std::literals::chrono_literals; @@ -46,7 +64,6 @@ class GcsHealthCheckManagerTest : public ::testing::Test { timeout_ms, period_ms, failure_threshold); - port = 10000; } void TearDown() override { @@ -65,7 +82,8 @@ class GcsHealthCheckManagerTest : public ::testing::Test { NodeID AddServer(bool alive = true) { std::promise port_promise; auto node_id = NodeID::FromRandom(); - + auto port = GetFreePort(); + RAY_LOG(INFO) << "Get port " << port; auto server = std::make_shared(node_id.Hex(), port, true); auto channel = grpc::CreateChannel("localhost:" + std::to_string(port), @@ -76,7 +94,6 @@ class GcsHealthCheckManagerTest : public ::testing::Test { } servers.emplace(node_id, server); health_check->AddNode(node_id, channel); - ++port; return node_id; } @@ -115,14 +132,13 @@ class GcsHealthCheckManagerTest : public ::testing::Test { } } - int port; instrumented_io_context io_service; std::unique_ptr health_check; std::unordered_map> servers; std::unordered_set dead_nodes; - const int64_t initial_delay_ms = 1000; - const int64_t timeout_ms = 1000; - const int64_t period_ms = 1000; + const int64_t initial_delay_ms = 100; + const int64_t timeout_ms = 10; + const int64_t period_ms = 10; const int64_t failure_threshold = 5; }; @@ -235,7 +251,41 @@ TEST_F(GcsHealthCheckManagerTest, NoRegister) { ASSERT_TRUE(dead_nodes.count(node_id)); } +TEST_F(GcsHealthCheckManagerTest, StressTest) { + boost::asio::io_service::work work(io_service); + std::srand(std::time(nullptr)); + auto t = std::make_unique([this]() { + io_service.run(); + }); + + std::vector alive_nodes; + + for(int i = 0; i < 200; ++i) { + alive_nodes.emplace_back(AddServer(true)); + std::this_thread::sleep_for(10ms); + } + + for(size_t i = 0; i < 200000000UL; ++i) { + auto iter = alive_nodes.begin() + std::rand() % alive_nodes.size(); + DeleteServer(*iter); + alive_nodes.erase(iter); + alive_nodes.emplace_back(AddServer(true)); + } + RAY_LOG(INFO) << "Finished!"; + io_service.stop(); + t->join(); +} + int main(int argc, char **argv) { + InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog, + ray::RayLog::ShutDownRayLog, + argv[0], + ray::RayLogLevel::INFO, + /*log_dir=*/""); + + ray::RayLog::InstallFailureSignalHandler(argv[0]); + ray::RayLog::InstallTerminateHandler(); + ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } From 10a6afcec7a702c2b0752b4c61b77051b7ff7160 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 25 Jan 2023 02:40:27 +0000 Subject: [PATCH 04/14] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs_server/test/gcs_health_check_manager_test.cc | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 07d9c749f4f8..8d325765af60 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include @@ -20,6 +19,7 @@ #include #include #include +#include #include using namespace boost; @@ -254,18 +254,16 @@ TEST_F(GcsHealthCheckManagerTest, NoRegister) { TEST_F(GcsHealthCheckManagerTest, StressTest) { boost::asio::io_service::work work(io_service); std::srand(std::time(nullptr)); - auto t = std::make_unique([this]() { - io_service.run(); - }); + auto t = std::make_unique([this]() { io_service.run(); }); std::vector alive_nodes; - for(int i = 0; i < 200; ++i) { + for (int i = 0; i < 200; ++i) { alive_nodes.emplace_back(AddServer(true)); std::this_thread::sleep_for(10ms); } - for(size_t i = 0; i < 200000000UL; ++i) { + for (size_t i = 0; i < 200000000UL; ++i) { auto iter = alive_nodes.begin() + std::rand() % alive_nodes.size(); DeleteServer(*iter); alive_nodes.erase(iter); From 5b265dae681260934203ea09cb1ac36fb187799a Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 25 Jan 2023 03:54:13 +0000 Subject: [PATCH 05/14] up Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/gcs_health_check_manager.cc | 11 +++++++---- .../gcs_server/test/gcs_health_check_manager_test.cc | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 260cd530fa7c..9bd8ee003257 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -85,14 +85,13 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { &request_, &response_, [this, - stub = stub_, stopped = this->stopped_, context = this->context_, now = absl::Now()](::grpc::Status status) { // This callback is done in gRPC's thread pool. STATS_health_check_rpc_latency_ms.Record( absl::ToInt64Milliseconds(absl::Now() - now)); - if (status.error_code() == ::grpc::StatusCode::CANCELLED) { + if (*stopped || status.error_code() == ::grpc::StatusCode::CANCELLED) { return; } manager_->io_service_.post( @@ -113,8 +112,12 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { } if (health_check_remaining_ == 0) { - manager_->io_service_.post([this]() { manager_->FailNode(node_id_); }, - ""); + manager_->io_service_.post([this, stopped]() { + if(*stopped) { + return; + } + manager_->FailNode(node_id_); }, + ""); } else { // Do another health check. timer_.expires_from_now( diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 8d325765af60..0d26649775bc 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -263,8 +263,9 @@ TEST_F(GcsHealthCheckManagerTest, StressTest) { std::this_thread::sleep_for(10ms); } - for (size_t i = 0; i < 200000000UL; ++i) { + for (size_t i = 0; i < 20000UL; ++i) { auto iter = alive_nodes.begin() + std::rand() % alive_nodes.size(); + health_check->RemoveNode(*iter); DeleteServer(*iter); alive_nodes.erase(iter); alive_nodes.emplace_back(AddServer(true)); From 9344a27bb3da836e05c68c7c275f06bad20d1324 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 25 Jan 2023 03:54:46 +0000 Subject: [PATCH 06/14] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs_server/gcs_health_check_manager.cc | 20 +++++++++---------- .../gcs/gcs_server/gcs_health_check_manager.h | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 9bd8ee003257..20ba0fe01fdf 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -84,10 +84,8 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { context_.get(), &request_, &response_, - [this, - stopped = this->stopped_, - context = this->context_, - now = absl::Now()](::grpc::Status status) { + [this, stopped = this->stopped_, context = this->context_, now = absl::Now()]( + ::grpc::Status status) { // This callback is done in gRPC's thread pool. STATS_health_check_rpc_latency_ms.Record( absl::ToInt64Milliseconds(absl::Now() - now)); @@ -112,12 +110,14 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { } if (health_check_remaining_ == 0) { - manager_->io_service_.post([this, stopped]() { - if(*stopped) { - return; - } - manager_->FailNode(node_id_); }, - ""); + manager_->io_service_.post( + [this, stopped]() { + if (*stopped) { + return; + } + manager_->FailNode(node_id_); + }, + ""); } else { // Do another health check. timer_.expires_from_now( diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index b475c4279f04..cd72db434d17 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -124,7 +124,7 @@ class GcsHealthCheckManager { std::shared_ptr stopped_; /// gRPC related fields - std::shared_ptr<::grpc::health::v1::Health::Stub> stub_; + std::unique_ptr<::grpc::health::v1::Health::Stub> stub_; // The context is used in the gRPC callback which is in another // thread, so we need it to be a shared_ptr. From b78ef881ac4f2e1ee99753ba43366eb672a97010 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 26 Jan 2023 00:43:45 +0000 Subject: [PATCH 07/14] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs_server/gcs_health_check_manager.cc | 54 ++++++++----------- .../gcs/gcs_server/gcs_health_check_manager.h | 21 ++------ .../test/gcs_health_check_manager_test.cc | 1 + 3 files changed, 28 insertions(+), 48 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 20ba0fe01fdf..60560d0b8aac 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -53,6 +53,7 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { if (iter == health_check_contexts_.end()) { return; } + iter->second->Stop(); health_check_contexts_.erase(iter); }, "GcsHealthCheckManager::RemoveNode"); @@ -60,8 +61,11 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { void GcsHealthCheckManager::FailNode(const NodeID &node_id) { RAY_LOG(WARNING) << "Node " << node_id << " is dead because the health check failed."; - on_node_death_callback_(node_id); - health_check_contexts_.erase(node_id); + auto iter = health_check_contexts_.find(node_id); + if(iter != health_check_contexts_.end()) { + on_node_death_callback_(node_id); + health_check_contexts_.erase(iter); + } } std::vector GcsHealthCheckManager::GetAllNodes() const { @@ -75,27 +79,23 @@ std::vector GcsHealthCheckManager::GetAllNodes() const { void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { using ::grpc::health::v1::HealthCheckResponse; - context_ = std::make_shared(); + // Reset the context/request/response for the next request. + context_.~ClientContext(); + new (&context_) grpc::ClientContext(); + response_.Clear(); auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(manager_->timeout_ms_); - context_->set_deadline(deadline); + context_.set_deadline(deadline); stub_->async()->Check( - context_.get(), - &request_, - &response_, - [this, stopped = this->stopped_, context = this->context_, now = absl::Now()]( - ::grpc::Status status) { + &context_, &request_, &response_, [this, now = absl::Now()](::grpc::Status status) { // This callback is done in gRPC's thread pool. STATS_health_check_rpc_latency_ms.Record( absl::ToInt64Milliseconds(absl::Now() - now)); - if (*stopped || status.error_code() == ::grpc::StatusCode::CANCELLED) { - return; - } manager_->io_service_.post( - [this, stopped, status]() { - // Stopped has to be read in the same thread where it's updated. - if (*stopped) { + [this, status]() { + if (stopped_) { + delete this; return; } RAY_LOG(DEBUG) << "Health check status: " << int(response_.status()); @@ -110,38 +110,28 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() { } if (health_check_remaining_ == 0) { - manager_->io_service_.post( - [this, stopped]() { - if (*stopped) { - return; - } - manager_->FailNode(node_id_); - }, - ""); + manager_->FailNode(node_id_); + delete this; } else { // Do another health check. timer_.expires_from_now( boost::posix_time::milliseconds(manager_->period_ms_)); - timer_.async_wait([this, stopped](auto ec) { - // We need to check stopped here as well since cancel - // won't impact the queued tasks. - if (ec != boost::asio::error::operation_aborted && !*stopped) { - StartHealthCheck(); - } - }); + timer_.async_wait([this](auto) { StartHealthCheck(); }); } }, "HealthCheck"); }); } +void GcsHealthCheckManager::HealthCheckContext::Stop() { stopped_ = true; } + void GcsHealthCheckManager::AddNode(const NodeID &node_id, std::shared_ptr channel) { io_service_.dispatch( [this, channel, node_id]() { RAY_CHECK(health_check_contexts_.count(node_id) == 0); - auto context = std::make_unique(this, channel, node_id); - health_check_contexts_.emplace(std::make_pair(node_id, std::move(context))); + auto context = new HealthCheckContext(this, channel, node_id); + health_check_contexts_.emplace(std::make_pair(node_id, context)); }, "GcsHealthCheckManager::AddNode"); } diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index cd72db434d17..d4fc30b40917 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -91,27 +91,16 @@ class GcsHealthCheckManager { NodeID node_id) : manager_(manager), node_id_(node_id), - stopped_(std::make_shared(false)), timer_(manager->io_service_), health_check_remaining_(manager->failure_threshold_) { request_.set_service(node_id.Hex()); stub_ = grpc::health::v1::Health::NewStub(channel); timer_.expires_from_now( boost::posix_time::milliseconds(manager_->initial_delay_ms_)); - timer_.async_wait([stopped = stopped_, this](auto ec) { - if (!*stopped && ec != boost::asio::error::operation_aborted) { - StartHealthCheck(); - } - }); + timer_.async_wait([this](auto) { StartHealthCheck(); }); } - ~HealthCheckContext() { - timer_.cancel(); - if (context_ != nullptr) { - context_->TryCancel(); - } - *stopped_ = true; - } + void Stop(); private: void StartHealthCheck(); @@ -121,14 +110,14 @@ class GcsHealthCheckManager { NodeID node_id_; // Whether the health check has stopped. - std::shared_ptr stopped_; + bool stopped_ = false; /// gRPC related fields std::unique_ptr<::grpc::health::v1::Health::Stub> stub_; // The context is used in the gRPC callback which is in another // thread, so we need it to be a shared_ptr. - std::shared_ptr context_; + grpc::ClientContext context_; ::grpc::health::v1::HealthCheckRequest request_; ::grpc::health::v1::HealthCheckResponse response_; @@ -146,7 +135,7 @@ class GcsHealthCheckManager { std::function on_node_death_callback_; /// The context of the health check for each nodes. - absl::flat_hash_map> health_check_contexts_; + absl::flat_hash_map health_check_contexts_; /// The delay for the first health check request. const int64_t initial_delay_ms_; diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 0d26649775bc..777bbd5b1170 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -264,6 +264,7 @@ TEST_F(GcsHealthCheckManagerTest, StressTest) { } for (size_t i = 0; i < 20000UL; ++i) { + RAY_LOG(INFO) << "Progress: " << i << "/20000"; auto iter = alive_nodes.begin() + std::rand() % alive_nodes.size(); health_check->RemoveNode(*iter); DeleteServer(*iter); From ee370a308496d788abd2c5428f28ac4ab37bc70c Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 26 Jan 2023 00:44:11 +0000 Subject: [PATCH 08/14] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/gcs_health_check_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc index 60560d0b8aac..7a3d99e74e05 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.cc @@ -62,7 +62,7 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) { void GcsHealthCheckManager::FailNode(const NodeID &node_id) { RAY_LOG(WARNING) << "Node " << node_id << " is dead because the health check failed."; auto iter = health_check_contexts_.find(node_id); - if(iter != health_check_contexts_.end()) { + if (iter != health_check_contexts_.end()) { on_node_death_callback_(node_id); health_check_contexts_.erase(iter); } From 5231a41f0e263518f39c63b9bed4cc0e68ee0f76 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 26 Jan 2023 00:50:46 +0000 Subject: [PATCH 09/14] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BUILD.bazel b/BUILD.bazel index 10d3b0e621fc..77840d8252c6 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1844,7 +1844,7 @@ cc_library( cc_test( name = "gcs_health_check_manager_test", - size = "small", + size = "medium", srcs = [ "src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc", ], From ca37f6c0b43a552ac59d0c5c3be87d84c80e54a4 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 26 Jan 2023 01:47:10 +0000 Subject: [PATCH 10/14] fix win Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 777bbd5b1170..cf1f6d674f1b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -254,7 +254,7 @@ TEST_F(GcsHealthCheckManagerTest, NoRegister) { TEST_F(GcsHealthCheckManagerTest, StressTest) { boost::asio::io_service::work work(io_service); std::srand(std::time(nullptr)); - auto t = std::make_unique([this]() { io_service.run(); }); + auto t = std::make_unique([this]() { this->io_service.run(); }); std::vector alive_nodes; From e75a2b64ec9bedb5c8b167ed412a7766b4e8f284 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 26 Jan 2023 05:00:57 +0000 Subject: [PATCH 11/14] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs/gcs_server/test/gcs_health_check_manager_test.cc | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index cf1f6d674f1b..2577447d58f8 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -159,8 +159,6 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) { Run(2); // One for starting RPC and one for the RPC callback. } - Run(); // For failure callback. - ASSERT_EQ(1, dead_nodes.size()); ASSERT_TRUE(dead_nodes.count(node_id)); } @@ -185,8 +183,6 @@ TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) { } } - Run(); // For failure callback. - ASSERT_EQ(0, dead_nodes.size()); } @@ -212,8 +208,6 @@ TEST_F(GcsHealthCheckManagerTest, Crashed) { Run(2); // One for starting RPC and one for the RPC callback. } - Run(); // For failure callback. - ASSERT_EQ(1, dead_nodes.size()); ASSERT_TRUE(dead_nodes.count(node_id)); } @@ -246,7 +240,7 @@ TEST_F(GcsHealthCheckManagerTest, NoRegister) { Run(2); // One for starting RPC and one for the RPC callback. } - Run(2); + Run(1); ASSERT_EQ(1, dead_nodes.size()); ASSERT_TRUE(dead_nodes.count(node_id)); } From 84b9f8ef23237ebe787dcba3c3a375a85bfa45cc Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 26 Jan 2023 06:50:18 +0000 Subject: [PATCH 12/14] skip tsan for stress test Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index 2577447d58f8..fe05a2a0d0eb 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -246,6 +246,9 @@ TEST_F(GcsHealthCheckManagerTest, NoRegister) { } TEST_F(GcsHealthCheckManagerTest, StressTest) { +#if defined(__has_feature) && __has_feature(thread_sanitizer) + GTEST_SKIP() << "Disabled in tsan because of performance"; +#endif boost::asio::io_service::work work(io_service); std::srand(std::time(nullptr)); auto t = std::make_unique([this]() { this->io_service.run(); }); From a07e1fcb13d2994f0aaaa4c08e7d0d282f05c634 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 26 Jan 2023 07:57:54 +0000 Subject: [PATCH 13/14] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .bazelrc | 1 + src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.bazelrc b/.bazelrc index 1c4b9f296b8f..1b98b8f72a3f 100644 --- a/.bazelrc +++ b/.bazelrc @@ -97,6 +97,7 @@ build:tsan --copt -g build:tsan --copt -fno-omit-frame-pointer build:tsan --copt -Wno-uninitialized build:tsan --linkopt -fsanitize=thread +build:tsan --cxxopt="-D_RAY_TSAN_BUILD" # This config is only for running TSAN with LLVM toolchain on Linux. build:tsan-clang --config=tsan build:tsan-clang --config=llvm diff --git a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc index fe05a2a0d0eb..31be1bbd5d6d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc @@ -246,7 +246,7 @@ TEST_F(GcsHealthCheckManagerTest, NoRegister) { } TEST_F(GcsHealthCheckManagerTest, StressTest) { -#if defined(__has_feature) && __has_feature(thread_sanitizer) +#ifdef _RAY_TSAN_BUILD GTEST_SKIP() << "Disabled in tsan because of performance"; #endif boost::asio::io_service::work work(io_service); From 70bc03bd6edaedbd73c602fccf9abfbaee1e853e Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 26 Jan 2023 18:07:31 +0000 Subject: [PATCH 14/14] up Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/gcs_health_check_manager.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_health_check_manager.h b/src/ray/gcs/gcs_server/gcs_health_check_manager.h index d4fc30b40917..d877a217d803 100644 --- a/src/ray/gcs/gcs_server/gcs_health_check_manager.h +++ b/src/ray/gcs/gcs_server/gcs_health_check_manager.h @@ -115,8 +115,6 @@ class GcsHealthCheckManager { /// gRPC related fields std::unique_ptr<::grpc::health::v1::Health::Stub> stub_; - // The context is used in the gRPC callback which is in another - // thread, so we need it to be a shared_ptr. grpc::ClientContext context_; ::grpc::health::v1::HealthCheckRequest request_; ::grpc::health::v1::HealthCheckResponse response_;