Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix gcs healthch manager crash when node is removed by node manager. #31917

Merged
merged 14 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ build:tsan --copt -g
build:tsan --copt -fno-omit-frame-pointer
build:tsan --copt -Wno-uninitialized
build:tsan --linkopt -fsanitize=thread
build:tsan --cxxopt="-D_RAY_TSAN_BUILD"
# This config is only for running TSAN with LLVM toolchain on Linux.
build:tsan-clang --config=tsan
build:tsan-clang --config=llvm
Expand Down
2 changes: 1 addition & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ cc_library(

cc_test(
name = "gcs_health_check_manager_test",
size = "small",
size = "medium",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc",
],
Expand Down
48 changes: 22 additions & 26 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,19 @@ void GcsHealthCheckManager::RemoveNode(const NodeID &node_id) {
if (iter == health_check_contexts_.end()) {
return;
}
iter->second->Stop();
health_check_contexts_.erase(iter);
},
"GcsHealthCheckManager::RemoveNode");
}

void GcsHealthCheckManager::FailNode(const NodeID &node_id) {
RAY_LOG(WARNING) << "Node " << node_id << " is dead because the health check failed.";
on_node_death_callback_(node_id);
health_check_contexts_.erase(node_id);
auto iter = health_check_contexts_.find(node_id);
if (iter != health_check_contexts_.end()) {
on_node_death_callback_(node_id);
health_check_contexts_.erase(iter);
}
}

std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
Expand All @@ -75,27 +79,23 @@ std::vector<NodeID> GcsHealthCheckManager::GetAllNodes() const {
void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
using ::grpc::health::v1::HealthCheckResponse;

context_ = std::make_shared<grpc::ClientContext>();
// Reset the context/request/response for the next request.
context_.~ClientContext();
new (&context_) grpc::ClientContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we make context_ an pointer or unique_ptr, would it make it more nature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually doesn't matter any difference I think since at the same time, only one in-flight request will be there per node.
I did this just following what gRPC did (https://sourcegraph.com/github.com/grpc/grpc/-/blob/test/cpp/qps/client_sync.cc?L189:26) and I think this is for performance (allocate on stack vs allocate on heap)(perf is not very important here, so I think no big difference.)

response_.Clear();

auto deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(manager_->timeout_ms_);
context_->set_deadline(deadline);
context_.set_deadline(deadline);
stub_->async()->Check(
context_.get(),
&request_,
&response_,
[this, stopped = this->stopped_, context = this->context_, now = absl::Now()](
::grpc::Status status) {
&context_, &request_, &response_, [this, now = absl::Now()](::grpc::Status status) {
// This callback is done in gRPC's thread pool.
STATS_health_check_rpc_latency_ms.Record(
absl::ToInt64Milliseconds(absl::Now() - now));
if (status.error_code() == ::grpc::StatusCode::CANCELLED) {
return;
}
manager_->io_service_.post(
[this, stopped, status]() {
// Stopped has to be read in the same thread where it's updated.
if (*stopped) {
[this, status]() {
if (stopped_) {
delete this;
return;
}
RAY_LOG(DEBUG) << "Health check status: " << int(response_.status());
Expand All @@ -110,32 +110,28 @@ void GcsHealthCheckManager::HealthCheckContext::StartHealthCheck() {
}

if (health_check_remaining_ == 0) {
manager_->io_service_.post([this]() { manager_->FailNode(node_id_); },
"");
manager_->FailNode(node_id_);
delete this;
} else {
// Do another health check.
timer_.expires_from_now(
boost::posix_time::milliseconds(manager_->period_ms_));
timer_.async_wait([this, stopped](auto ec) {
// We need to check stopped here as well since cancel
// won't impact the queued tasks.
if (ec != boost::asio::error::operation_aborted && !*stopped) {
StartHealthCheck();
}
});
timer_.async_wait([this](auto) { StartHealthCheck(); });
}
},
"HealthCheck");
});
}

void GcsHealthCheckManager::HealthCheckContext::Stop() { stopped_ = true; }

void GcsHealthCheckManager::AddNode(const NodeID &node_id,
std::shared_ptr<grpc::Channel> channel) {
io_service_.dispatch(
[this, channel, node_id]() {
RAY_CHECK(health_check_contexts_.count(node_id) == 0);
auto context = std::make_unique<HealthCheckContext>(this, channel, node_id);
health_check_contexts_.emplace(std::make_pair(node_id, std::move(context)));
auto context = new HealthCheckContext(this, channel, node_id);
health_check_contexts_.emplace(std::make_pair(node_id, context));
},
"GcsHealthCheckManager::AddNode");
}
Expand Down
23 changes: 5 additions & 18 deletions src/ray/gcs/gcs_server/gcs_health_check_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,16 @@ class GcsHealthCheckManager {
NodeID node_id)
: manager_(manager),
node_id_(node_id),
stopped_(std::make_shared<bool>(false)),
timer_(manager->io_service_),
health_check_remaining_(manager->failure_threshold_) {
request_.set_service(node_id.Hex());
stub_ = grpc::health::v1::Health::NewStub(channel);
timer_.expires_from_now(
boost::posix_time::milliseconds(manager_->initial_delay_ms_));
timer_.async_wait([this](auto ec) {
if (ec != boost::asio::error::operation_aborted) {
StartHealthCheck();
}
});
timer_.async_wait([this](auto) { StartHealthCheck(); });
}

~HealthCheckContext() {
timer_.cancel();
if (context_ != nullptr) {
context_->TryCancel();
}
*stopped_ = true;
}
void Stop();

private:
void StartHealthCheck();
Expand All @@ -121,14 +110,12 @@ class GcsHealthCheckManager {
NodeID node_id_;

// Whether the health check has stopped.
std::shared_ptr<bool> stopped_;
bool stopped_ = false;

/// gRPC related fields
std::unique_ptr<::grpc::health::v1::Health::Stub> stub_;

// The context is used in the gRPC callback which is in another
// thread, so we need it to be a shared_ptr.
std::shared_ptr<grpc::ClientContext> context_;
grpc::ClientContext context_;
::grpc::health::v1::HealthCheckRequest request_;
::grpc::health::v1::HealthCheckResponse response_;

Expand All @@ -146,7 +133,7 @@ class GcsHealthCheckManager {
std::function<void(const NodeID &)> on_node_death_callback_;

/// The context of the health check for each nodes.
absl::flat_hash_map<NodeID, std::unique_ptr<HealthCheckContext>> health_check_contexts_;
absl::flat_hash_map<NodeID, HealthCheckContext *> health_check_contexts_;

/// The delay for the first health check request.
const int64_t initial_delay_ms_;
Expand Down
75 changes: 61 additions & 14 deletions src/ray/gcs/gcs_server/test/gcs_health_check_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
#include <cstdlib>
#include <unordered_map>

using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;

#include <ray/rpc/grpc_server.h>

#include <chrono>
Expand All @@ -30,6 +34,20 @@ using namespace boost;
#include "gtest/gtest.h"
#include "ray/gcs/gcs_server/gcs_health_check_manager.h"

int GetFreePort() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fun fact: it's generated by chatgpt

io_service io_service;
tcp::acceptor acceptor(io_service);
tcp::endpoint endpoint;

// try to bind to port 0 to find a free port
acceptor.open(tcp::v4());
acceptor.bind(tcp::endpoint(tcp::v4(), 0));
endpoint = acceptor.local_endpoint();
auto port = endpoint.port();
acceptor.close();
return port;
}

using namespace ray;
using namespace std::literals::chrono_literals;

Expand All @@ -46,7 +64,6 @@ class GcsHealthCheckManagerTest : public ::testing::Test {
timeout_ms,
period_ms,
failure_threshold);
port = 10000;
}

void TearDown() override {
Expand All @@ -65,7 +82,8 @@ class GcsHealthCheckManagerTest : public ::testing::Test {
NodeID AddServer(bool alive = true) {
std::promise<int> port_promise;
auto node_id = NodeID::FromRandom();

auto port = GetFreePort();
RAY_LOG(INFO) << "Get port " << port;
auto server = std::make_shared<rpc::GrpcServer>(node_id.Hex(), port, true);

auto channel = grpc::CreateChannel("localhost:" + std::to_string(port),
Expand All @@ -76,7 +94,6 @@ class GcsHealthCheckManagerTest : public ::testing::Test {
}
servers.emplace(node_id, server);
health_check->AddNode(node_id, channel);
++port;
return node_id;
}

Expand Down Expand Up @@ -115,14 +132,13 @@ class GcsHealthCheckManagerTest : public ::testing::Test {
}
}

int port;
instrumented_io_context io_service;
std::unique_ptr<gcs::GcsHealthCheckManager> health_check;
std::unordered_map<NodeID, std::shared_ptr<rpc::GrpcServer>> servers;
std::unordered_set<NodeID> dead_nodes;
const int64_t initial_delay_ms = 1000;
const int64_t timeout_ms = 1000;
const int64_t period_ms = 1000;
const int64_t initial_delay_ms = 100;
const int64_t timeout_ms = 10;
const int64_t period_ms = 10;
const int64_t failure_threshold = 5;
};

Expand All @@ -143,8 +159,6 @@ TEST_F(GcsHealthCheckManagerTest, TestBasic) {
Run(2); // One for starting RPC and one for the RPC callback.
}

Run(); // For failure callback.

ASSERT_EQ(1, dead_nodes.size());
ASSERT_TRUE(dead_nodes.count(node_id));
}
Expand All @@ -169,8 +183,6 @@ TEST_F(GcsHealthCheckManagerTest, StoppedAndResume) {
}
}

Run(); // For failure callback.

ASSERT_EQ(0, dead_nodes.size());
}

Expand All @@ -196,8 +208,6 @@ TEST_F(GcsHealthCheckManagerTest, Crashed) {
Run(2); // One for starting RPC and one for the RPC callback.
}

Run(); // For failure callback.

ASSERT_EQ(1, dead_nodes.size());
ASSERT_TRUE(dead_nodes.count(node_id));
}
Expand Down Expand Up @@ -230,12 +240,49 @@ TEST_F(GcsHealthCheckManagerTest, NoRegister) {
Run(2); // One for starting RPC and one for the RPC callback.
}

Run(2);
Run(1);
ASSERT_EQ(1, dead_nodes.size());
ASSERT_TRUE(dead_nodes.count(node_id));
}

TEST_F(GcsHealthCheckManagerTest, StressTest) {
#ifdef _RAY_TSAN_BUILD
GTEST_SKIP() << "Disabled in tsan because of performance";
#endif
boost::asio::io_service::work work(io_service);
std::srand(std::time(nullptr));
auto t = std::make_unique<std::thread>([this]() { this->io_service.run(); });

std::vector<NodeID> alive_nodes;

for (int i = 0; i < 200; ++i) {
alive_nodes.emplace_back(AddServer(true));
std::this_thread::sleep_for(10ms);
}

for (size_t i = 0; i < 20000UL; ++i) {
RAY_LOG(INFO) << "Progress: " << i << "/20000";
auto iter = alive_nodes.begin() + std::rand() % alive_nodes.size();
health_check->RemoveNode(*iter);
DeleteServer(*iter);
alive_nodes.erase(iter);
alive_nodes.emplace_back(AddServer(true));
}
RAY_LOG(INFO) << "Finished!";
io_service.stop();
t->join();
}

int main(int argc, char **argv) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
ray::RayLog::ShutDownRayLog,
argv[0],
ray::RayLogLevel::INFO,
/*log_dir=*/"");

ray::RayLog::InstallFailureSignalHandler(argv[0]);
ray::RayLog::InstallTerminateHandler();

::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}