From 1f9b0e44cdc769d96ac534811f6e556b72df0bf2 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 16 Oct 2025 05:55:15 +0000 Subject: [PATCH 1/8] Make ReleaseUnusedBundles Fault Tolerant Signed-off-by: joshlee --- .../ray/tests/test_raylet_fault_tolerance.py | 53 +++++- src/ray/protobuf/node_manager.proto | 2 +- src/ray/raylet/BUILD.bazel | 10 ++ src/ray/raylet/fake_worker.h | 151 ++++++++++++++++++ src/ray/raylet/main.cc | 10 +- src/ray/raylet/node_manager.cc | 15 +- src/ray/raylet/node_manager.h | 14 +- .../raylet/placement_group_resource_manager.h | 3 + src/ray/raylet/tests/BUILD.bazel | 1 + src/ray/raylet/tests/node_manager_test.cc | 108 ++++++++++++- src/ray/raylet/tests/util.h | 11 +- src/ray/raylet_rpc_client/raylet_client.cc | 3 +- 12 files changed, 352 insertions(+), 29 deletions(-) create mode 100644 src/ray/raylet/fake_worker.h diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index 21fcd1e84a5d..f04a8cb0ee6f 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -3,7 +3,11 @@ import pytest import ray -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy +from ray.util.placement_group import placement_group, remove_placement_group +from ray.util.scheduling_strategies import ( + NodeAffinitySchedulingStrategy, + PlacementGroupSchedulingStrategy, +) @pytest.mark.parametrize("deterministic_failure", ["request", "response"]) @@ -44,5 +48,52 @@ def simple_task_2(): assert ray.get([result_ref1, result_ref2]) == [0, 1] +@pytest.fixture +def inject_rpc_failures(monkeypatch, request): + monkeypatch.setenv( + "RAY_testing_rpc_failure", + "NodeManagerService.grpc_client.ReleaseUnusedBundles=1:100:0" + + ",NodeManagerService.grpc_client.CancelResourceReserve=100:100:0", + ) + + +@pytest.mark.parametrize( + "ray_start_cluster_head_with_external_redis", + [{"num_cpus": 1}], + indirect=True, +) +def test_release_unused_bundles_idempotent( + inject_rpc_failures, ray_start_cluster_head_with_external_redis +): + cluster = ray_start_cluster_head_with_external_redis + + @ray.remote(num_cpus=1) + def task(): + return "success" + + pg = placement_group(name="test_pg", strategy="PACK", bundles=[{"CPU": 1}]) + assert pg.wait(timeout_seconds=10) + + result_ref = task.options( + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, + placement_group_bundle_index=0, + ) + ).remote() + assert ray.get(result_ref, timeout=10) == "success" + + # Remove the placement group. This will trigger CancelResourceReserve RPCs which need to be blocked + # for the placement group bundle to be leaked. + remove_placement_group(pg) + + cluster.head_node.kill_gcs_server() + # ReleaseUnusedBundles only triggers after GCS restart to clean up potentially leaked bundles. + cluster.head_node.start_gcs_server() + + # If the leaked bundle wasn't cleaned up, this task will hang due to resource unavailability + result = ray.get(task.remote(), timeout=30) + assert result == "success" + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 0ff3ccc59621..8f7cf7476b3d 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -490,7 +490,7 @@ service NodeManagerService { // that may be leaked. When GCS restarts, it doesn't know which bundles it has leased // in the previous lifecycle. In this case, GCS will send a list of bundles that // are still needed. And Raylet will release other bundles. - // TODO: Need to handle network failure. + // Failure: Retries, it's idempotent. rpc ReleaseUnusedBundles(ReleaseUnusedBundlesRequest) returns (ReleaseUnusedBundlesReply); // Get the system config. diff --git a/src/ray/raylet/BUILD.bazel b/src/ray/raylet/BUILD.bazel index a95b3c6693d3..16cd5fad2a30 100644 --- a/src/ray/raylet/BUILD.bazel +++ b/src/ray/raylet/BUILD.bazel @@ -98,6 +98,16 @@ ray_cc_library( ], ) +ray_cc_library( + name = "fake_worker", + hdrs = ["fake_worker.h"], + visibility = [":__subpackages__"], + deps = [ + ":worker", + "//src/ray/raylet_ipc_client:client_connection", + ], +) + ray_cc_library( name = "worker_pool", srcs = ["worker_pool.cc"], diff --git a/src/ray/raylet/fake_worker.h b/src/ray/raylet/fake_worker.h new file mode 100644 index 000000000000..3ae20ecc6db3 --- /dev/null +++ b/src/ray/raylet/fake_worker.h @@ -0,0 +1,151 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "ray/raylet/worker.h" +#include "ray/raylet_ipc_client/client_connection.h" + +namespace ray { +namespace raylet { + +class FakeClientConnection { + public: + /// Create a FakeClientConnection with a dummy socket. + /// + /// \param io_context The IO context to create the socket with. + /// \return A shared pointer to a ClientConnection. + static std::shared_ptr Create(instrumented_io_context &io_context) { + local_stream_socket socket(io_context); + return ClientConnection::Create( + /*message_handler=*/ + [](std::shared_ptr, int64_t, const std::vector &) {}, + /*connection_error_handler=*/ + [](std::shared_ptr, const boost::system::error_code &) {}, + std::move(socket), + /*debug_label=*/"fake_worker_connection", + /*message_type_enum_names=*/{}); + } +}; + +/// A fake implementation of WorkerInterface for testing. +/// This provides a minimal no-op implementation with only the essential fields needed +/// for disconnect/cleanup paths. The key feature is providing a real ClientConnection +/// to prevent segfaults in DestroyWorker callflow. +class FakeWorker : public WorkerInterface { + public: + FakeWorker(WorkerID worker_id, int port, instrumented_io_context &io_context) + : worker_id_(worker_id), + port_(port), + proc_(Process::CreateNewDummy()), + connection_(FakeClientConnection::Create(io_context)) {} + + WorkerID WorkerId() const override { return worker_id_; } + rpc::WorkerType GetWorkerType() const override { return rpc::WorkerType::WORKER; } + int Port() const override { return port_; } + void SetOwnerAddress(const rpc::Address &address) override {} + void GrantLease(const RayLease &granted_lease) override {} + void GrantLeaseId(const LeaseID &lease_id) override { lease_id_ = lease_id; } + const RayLease &GetGrantedLease() const override { + static RayLease empty_lease; + return empty_lease; + } + absl::Time GetGrantedLeaseTime() const override { return absl::InfiniteFuture(); } + std::optional GetIsGpu() const override { return std::nullopt; } + std::optional GetIsActorWorker() const override { return std::nullopt; } + const std::string IpAddress() const override { return "127.0.0.1"; } + void AsyncNotifyGCSRestart() override {} + void SetAllocatedInstances( + const std::shared_ptr &allocated_instances) override {} + void SetLifetimeAllocatedInstances( + const std::shared_ptr &allocated_instances) override {} + std::shared_ptr GetAllocatedInstances() override { + return nullptr; + } + std::shared_ptr GetLifetimeAllocatedInstances() override { + return nullptr; + } + void MarkDead() override {} + bool IsDead() const override { return false; } + void KillAsync(instrumented_io_context &io_service, bool force) override {} + void MarkBlocked() override {} + void MarkUnblocked() override {} + bool IsBlocked() const override { return false; } + Process GetProcess() const override { return proc_; } + StartupToken GetStartupToken() const override { return 0; } + void SetProcess(Process proc) override {} + Language GetLanguage() const override { return Language::PYTHON; } + void Connect(int port) override {} + void Connect(std::shared_ptr rpc_client) override {} + int AssignedPort() const override { return -1; } + void SetAssignedPort(int port) override {} + const LeaseID &GetGrantedLeaseId() const override { return lease_id_; } + const JobID &GetAssignedJobId() const override { + static JobID job_id = JobID::FromInt(1); + return job_id; + } + int GetRuntimeEnvHash() const override { return 0; } + void AssignActorId(const ActorID &actor_id) override {} + const ActorID &GetActorId() const override { + static ActorID actor_id; + return actor_id; + } + const std::string GetLeaseIdAsDebugString() const override { return ""; } + bool IsDetachedActor() const override { return false; } + const std::shared_ptr Connection() const override { + return connection_; + } + const rpc::Address &GetOwnerAddress() const override { + static rpc::Address address; + return address; + } + std::optional GetSavedProcessGroupId() const override { return std::nullopt; } + void SetSavedProcessGroupId(pid_t pgid) override {} + void ActorCallArgWaitComplete(int64_t tag) override {} + void ClearAllocatedInstances() override {} + void ClearLifetimeAllocatedInstances() override {} + const BundleID &GetBundleId() const override { return bundle_id_; } + void SetBundleId(const BundleID &bundle_id) override { bundle_id_ = bundle_id; } + RayLease &GetGrantedLease() override { + static RayLease empty_lease; + return empty_lease; + } + bool IsRegistered() override { return false; } + rpc::CoreWorkerClientInterface *rpc_client() override { return nullptr; } + bool IsAvailableForScheduling() const override { return true; } + void SetJobId(const JobID &job_id) override {} + const ActorID &GetRootDetachedActorId() const override { + static ActorID root_detached_actor_id; + return root_detached_actor_id; + } + + protected: + void SetStartupToken(StartupToken startup_token) override {} + + private: + WorkerID worker_id_; + int port_; + LeaseID lease_id_; + BundleID bundle_id_; + Process proc_; + std::shared_ptr connection_; +}; + +} // namespace raylet +} // namespace ray diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index b99ab969c0e8..ce77803385d6 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -329,6 +329,8 @@ int main(int argc, char *argv[]) { ray::stats::Gauge task_by_state_counter = ray::core::GetTaskByStateGaugeMetric(); std::unique_ptr plasma_client; + std::unique_ptr + placement_group_resource_manager; std::unique_ptr node_manager; std::unique_ptr client_call_manager; std::unique_ptr worker_rpc_pool; @@ -913,6 +915,11 @@ int main(int argc, char *argv[]) { }; plasma_client = std::make_unique(); + + placement_group_resource_manager = + std::make_unique( + *cluster_resource_scheduler); + node_manager = std::make_unique( main_service, raylet_node_id, @@ -939,7 +946,8 @@ int main(int argc, char *argv[]) { /*check_signals=*/nullptr), shutdown_raylet_gracefully, std::move(add_process_to_system_cgroup_hook), - std::move(cgroup_manager)); + std::move(cgroup_manager), + *placement_group_resource_manager); // Initialize the node manager. raylet = std::make_unique(main_service, diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4ef68f212a28..a2ef62763ec2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -146,7 +146,8 @@ NodeManager::NodeManager( mutable_object_provider, std::function shutdown_raylet_gracefully, AddProcessToCgroupHook add_process_to_system_cgroup_hook, - std::unique_ptr cgroup_manager) + std::unique_ptr cgroup_manager, + PlacementGroupResourceManager &placement_group_resource_manager) : self_node_id_(self_node_id), self_node_name_(std::move(self_node_name)), io_service_(io_service), @@ -189,6 +190,7 @@ NodeManager::NodeManager( local_lease_manager_(local_lease_manager), cluster_lease_manager_(cluster_lease_manager), record_metrics_period_ms_(config.record_metrics_period_ms), + placement_group_resource_manager_(placement_group_resource_manager), next_resource_seq_no_(0), ray_syncer_(io_service_, self_node_id_.Binary()), worker_killing_policy_(std::make_shared()), @@ -202,9 +204,6 @@ NodeManager::NodeManager( cgroup_manager_(std::move(cgroup_manager)) { RAY_LOG(INFO).WithField(kLogKeyNodeID, self_node_id_) << "Initializing NodeManager"; - placement_group_resource_manager_ = - std::make_unique(cluster_resource_scheduler_); - periodical_runner_->RunFnPeriodically( [this]() { cluster_lease_manager_.ScheduleAndGrantLeases(); }, RayConfig::instance().worker_cap_initial_backoff_delay_ms(), @@ -572,7 +571,7 @@ void NodeManager::HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest re } // Return unused bundle resources. - placement_group_resource_manager_->ReturnUnusedBundle(in_use_bundles); + placement_group_resource_manager_.ReturnUnusedBundle(in_use_bundles); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -1824,7 +1823,7 @@ void NodeManager::HandlePrepareBundleResources( } RAY_LOG(DEBUG) << "Request to prepare resources for bundles: " << GetDebugStringForBundles(bundle_specs); - auto prepared = placement_group_resource_manager_->PrepareBundles(bundle_specs); + auto prepared = placement_group_resource_manager_.PrepareBundles(bundle_specs); reply->set_success(prepared); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -1840,7 +1839,7 @@ void NodeManager::HandleCommitBundleResources( } RAY_LOG(DEBUG) << "Request to commit resources for bundles: " << GetDebugStringForBundles(bundle_specs); - placement_group_resource_manager_->CommitBundles(bundle_specs); + placement_group_resource_manager_.CommitBundles(bundle_specs); send_reply_callback(Status::OK(), nullptr, nullptr); cluster_lease_manager_.ScheduleAndGrantLeases(); @@ -1894,7 +1893,7 @@ void NodeManager::HandleCancelResourceReserve( DestroyWorker(worker, rpc::WorkerExitType::INTENDED_SYSTEM_EXIT, message); } - RAY_CHECK_OK(placement_group_resource_manager_->ReturnBundle(bundle_spec)); + RAY_CHECK_OK(placement_group_resource_manager_.ReturnBundle(bundle_spec)); cluster_lease_manager_.ScheduleAndGrantLeases(); send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 6498219b24a8..22c3541efc43 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -155,7 +155,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler, mutable_object_provider, std::function shutdown_raylet_gracefully, AddProcessToCgroupHook add_process_to_system_cgroup_hook, - std::unique_ptr cgroup_manager); + std::unique_ptr cgroup_manager, + PlacementGroupResourceManager &placement_group_resource_manager); /// Handle an unexpected error that occurred on a client connection. /// The client will be disconnected and no more messages will be processed. @@ -294,6 +295,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::CancelWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) override; + void HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest request, + rpc::ReleaseUnusedBundlesReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + private: FRIEND_TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog); @@ -605,11 +610,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::FormatGlobalMemoryInfoReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle a `ReleaseUnusedBundles` request. - void HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest request, - rpc::ReleaseUnusedBundlesReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - /// Handle a `GetSystemConfig` request. void HandleGetSystemConfig(rpc::GetSystemConfigRequest request, rpc::GetSystemConfigReply *reply, @@ -865,7 +865,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler, uint64_t number_workers_killed_ = 0; /// Managers all bundle-related operations. - std::unique_ptr placement_group_resource_manager_; + PlacementGroupResourceManager &placement_group_resource_manager_; /// Next resource broadcast seq no. Non-incrementing sequence numbers /// indicate network issues (dropped/duplicated/ooo packets, etc). diff --git a/src/ray/raylet/placement_group_resource_manager.h b/src/ray/raylet/placement_group_resource_manager.h index 76dc72e5a244..80e2fc955d3f 100644 --- a/src/ray/raylet/placement_group_resource_manager.h +++ b/src/ray/raylet/placement_group_resource_manager.h @@ -83,6 +83,9 @@ class PlacementGroupResourceManager { /// Save `BundleSpecification` for cleaning leaked bundles after GCS restart. absl::flat_hash_map, pair_hash> bundle_spec_map_; + + friend bool IsBundleRegistered(const PlacementGroupResourceManager &manager, + const BundleID &bundle_id); }; /// Associated with new scheduler. diff --git a/src/ray/raylet/tests/BUILD.bazel b/src/ray/raylet/tests/BUILD.bazel index 57f330690039..52eaa4b02c96 100644 --- a/src/ray/raylet/tests/BUILD.bazel +++ b/src/ray/raylet/tests/BUILD.bazel @@ -161,6 +161,7 @@ ray_cc_test( "//src/ray/object_manager/plasma:plasma_client", "//src/ray/observability:fake_metric", "//src/ray/pubsub:fake_subscriber", + "//src/ray/raylet:fake_worker", "//src/ray/raylet:local_object_manager_interface", "//src/ray/raylet:node_manager", "//src/ray/raylet/scheduling:cluster_lease_manager", diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index c93710b817ab..972c00338642 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -30,12 +31,15 @@ #include "mock/ray/raylet/worker_pool.h" #include "mock/ray/rpc/worker/core_worker_client.h" #include "ray/common/buffer.h" +#include "ray/common/bundle_spec.h" #include "ray/common/flatbuf_utils.h" #include "ray/common/scheduling/cluster_resource_data.h" +#include "ray/common/scheduling/resource_set.h" #include "ray/core_worker_rpc_client/core_worker_client_pool.h" #include "ray/object_manager/plasma/fake_plasma_client.h" #include "ray/observability/fake_metric.h" #include "ray/pubsub/fake_subscriber.h" +#include "ray/raylet/fake_worker.h" #include "ray/raylet/local_object_manager_interface.h" #include "ray/raylet/scheduling/cluster_lease_manager.h" #include "ray/raylet/tests/util.h" @@ -307,6 +311,8 @@ class NodeManagerTest : public ::testing::Test { NodeManagerConfig node_manager_config{}; node_manager_config.maximum_startup_concurrency = 1; node_manager_config.store_socket_name = "test_store_socket"; + node_manager_config.resource_config = + ResourceSet(absl::flat_hash_map{{"CPU", 10.0}}); core_worker_subscriber_ = std::make_unique(); mock_object_directory_ = std::make_unique(); @@ -395,6 +401,9 @@ class NodeManagerTest : public ::testing::Test { [](const ray::RayLease &lease) {}, *local_lease_manager_); + placement_group_resource_manager_ = + std::make_unique(*cluster_resource_scheduler_); + node_manager_ = std::make_unique( io_service_, raylet_node_id_, @@ -419,7 +428,8 @@ class NodeManagerTest : public ::testing::Test { /*shutdown_raylet_gracefully=*/ [](const auto &) {}, [](const std::string &) {}, - nullptr); + nullptr, + *placement_group_resource_manager_); } instrumented_io_context io_service_; @@ -432,6 +442,7 @@ class NodeManagerTest : public ::testing::Test { std::unique_ptr cluster_resource_scheduler_; std::unique_ptr local_lease_manager_; std::unique_ptr cluster_lease_manager_; + std::unique_ptr placement_group_resource_manager_; std::shared_ptr local_object_manager_; std::unique_ptr lease_dependency_manager_; std::unique_ptr mock_gcs_client_ = @@ -1056,7 +1067,7 @@ TEST_F(NodeManagerTest, TestHandleRequestWorkerLeaseIdempotent) { } TEST_F(NodeManagerTest, TestHandleRequestWorkerLeaseInfeasibleIdempotent) { - auto lease_spec = BuildLeaseSpec({{"CPU", 1}}); + auto lease_spec = BuildLeaseSpec({{"CPU", 11}}); lease_spec.GetMutableMessage() .mutable_scheduling_strategy() ->mutable_node_affinity_scheduling_strategy() @@ -1220,6 +1231,99 @@ INSTANTIATE_TEST_SUITE_P(PinObjectIDsIdempotencyVariations, PinObjectIDsIdempotencyTest, testing::Bool()); +bool IsBundleRegistered(const PlacementGroupResourceManager &manager, + const BundleID &bundle_id) { + return manager.bundle_spec_map_.contains(bundle_id); +} + +class ReleaseUnusedBundlesIdempotencyTest : public NodeManagerTest, + public ::testing::WithParamInterface {}; + +TEST_P(ReleaseUnusedBundlesIdempotencyTest, TestHandleReleaseUnusedBundlesIdempotency) { + // bundle_in_use: determines whether we mark a bundle as in use and it is released by + // the placement group resource manager. + // bundle_in_use == true: a bundle is marked as in use in the placement group resource + // manager. ReleaseUnusedBundles is expected to not release the bundle. + // bundle_in_use == false: a bundle is not marked as in use in the placement group + // resource manager. ReleaseUnusedBundles is expected to release the bundle. + bool bundle_in_use = GetParam(); + + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); + absl::flat_hash_map unit_resource = {{"CPU", 1.0}}; + + auto bundle_id = BundleID(group_id, 1); + rpc::Bundle bundle; + auto *bundle_id_msg = bundle.mutable_bundle_id(); + bundle_id_msg->set_placement_group_id(group_id.Binary()); + bundle_id_msg->set_bundle_index(1); + auto unit_resources = bundle.mutable_unit_resources(); + for (const auto &[key, value] : unit_resource) { + unit_resources->insert({key, value}); + } + auto bundle_spec = std::make_shared(bundle); + ASSERT_TRUE(placement_group_resource_manager_->PrepareBundles({bundle_spec})); + placement_group_resource_manager_->CommitBundles({bundle_spec}); + + EXPECT_TRUE(IsBundleRegistered(*placement_group_resource_manager_, bundle_id)); + + WorkerID worker_id = WorkerID::FromRandom(); + LeaseID lease_id = LeaseID::FromRandom(); + auto worker = std::make_shared(worker_id, 0, io_service_); + worker->SetBundleId(bundle_id); + worker->GrantLeaseId(lease_id); + leased_workers_.emplace(lease_id, worker); + + rpc::ReleaseUnusedBundlesRequest request; + if (bundle_in_use) { + auto *bundle_entry = request.add_bundles_in_use(); + bundle_entry->mutable_bundle_id()->set_placement_group_id(group_id.Binary()); + bundle_entry->mutable_bundle_id()->set_bundle_index(1); + } else { + // When the bundle is not in use, the worker associated with that bundle is destroyed + // hence need to mock the GetRegisteredWorker call to return the worker. + EXPECT_CALL( + mock_worker_pool_, + GetRegisteredWorker(testing::An &>())) + .WillOnce(Return(worker)); + } + + rpc::ReleaseUnusedBundlesReply reply1; + node_manager_->HandleReleaseUnusedBundles( + request, &reply1, [](Status s, std::function, std::function) { + EXPECT_TRUE(s.ok()); + }); + + if (bundle_in_use) { + EXPECT_TRUE(leased_workers_.contains(lease_id)); + EXPECT_EQ(leased_workers_.size(), 1); + EXPECT_TRUE(IsBundleRegistered(*placement_group_resource_manager_, bundle_id)); + } else { + EXPECT_FALSE(leased_workers_.contains(lease_id)); + EXPECT_EQ(leased_workers_.size(), 0); + EXPECT_FALSE(IsBundleRegistered(*placement_group_resource_manager_, bundle_id)); + } + + rpc::ReleaseUnusedBundlesReply reply2; + node_manager_->HandleReleaseUnusedBundles( + request, &reply2, [](Status s, std::function, std::function) { + EXPECT_TRUE(s.ok()); + }); + + if (bundle_in_use) { + EXPECT_TRUE(leased_workers_.contains(lease_id)); + EXPECT_EQ(leased_workers_.size(), 1); + EXPECT_TRUE(IsBundleRegistered(*placement_group_resource_manager_, bundle_id)); + } else { + EXPECT_FALSE(leased_workers_.contains(lease_id)); + EXPECT_EQ(leased_workers_.size(), 0); + EXPECT_FALSE(IsBundleRegistered(*placement_group_resource_manager_, bundle_id)); + } +} + +INSTANTIATE_TEST_SUITE_P(ReleaseUnusedBundlesIdempotencyVariations, + ReleaseUnusedBundlesIdempotencyTest, + ::testing::Bool()); + } // namespace ray::raylet int main(int argc, char **argv) { diff --git a/src/ray/raylet/tests/util.h b/src/ray/raylet/tests/util.h index ee4a88d831ea..92c7cf6f53d5 100644 --- a/src/ray/raylet/tests/util.h +++ b/src/ray/raylet/tests/util.h @@ -121,10 +121,7 @@ class MockWorker : public WorkerInterface { void AssignActorId(const ActorID &actor_id) override { RAY_CHECK(false) << "Method unused"; } - const ActorID &GetActorId() const override { - RAY_CHECK(false) << "Method unused"; - return ActorID::Nil(); - } + const ActorID &GetActorId() const override { return actor_id_; } const std::string GetLeaseIdAsDebugString() const override { RAY_CHECK(false) << "Method unused"; return ""; @@ -149,10 +146,7 @@ class MockWorker : public WorkerInterface { lifetime_allocated_instances_ = nullptr; } - const BundleID &GetBundleId() const override { - RAY_CHECK(false) << "Method unused"; - return bundle_id_; - } + const BundleID &GetBundleId() const override { return bundle_id_; } void SetBundleId(const BundleID &bundle_id) override { bundle_id_ = bundle_id; } @@ -200,6 +194,7 @@ class MockWorker : public WorkerInterface { int runtime_env_hash_; LeaseID lease_id_; JobID job_id_; + ActorID actor_id_; ActorID root_detached_actor_id_; Process proc_; std::atomic killing_ = false; diff --git a/src/ray/raylet_rpc_client/raylet_client.cc b/src/ray/raylet_rpc_client/raylet_client.cc index 8bf6790d45e8..e2c2c22102cf 100644 --- a/src/ray/raylet_rpc_client/raylet_client.cc +++ b/src/ray/raylet_rpc_client/raylet_client.cc @@ -303,7 +303,8 @@ void RayletClient::ReleaseUnusedBundles( for (auto &bundle : bundles_in_use) { request.add_bundles_in_use()->CopyFrom(bundle); } - INVOKE_RPC_CALL( + INVOKE_RETRYABLE_RPC_CALL( + retryable_grpc_client_, NodeManagerService, ReleaseUnusedBundles, request, From ba5f6b4f386b6871c4b2c3ff27195606bc2731a0 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 16 Oct 2025 06:09:49 +0000 Subject: [PATCH 2/8] nits Signed-off-by: joshlee --- .../ray/tests/test_raylet_fault_tolerance.py | 4 +- src/ray/raylet/fake_worker.h | 37 ++++++------------- src/ray/raylet/tests/util.h | 11 ++++-- 3 files changed, 21 insertions(+), 31 deletions(-) diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index f04a8cb0ee6f..e80f17147bf6 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -65,6 +65,7 @@ def inject_rpc_failures(monkeypatch, request): def test_release_unused_bundles_idempotent( inject_rpc_failures, ray_start_cluster_head_with_external_redis ): + # NOTE: Not testing response failure because the leaked bundle is cleaned up anyway cluster = ray_start_cluster_head_with_external_redis @ray.remote(num_cpus=1) @@ -72,7 +73,6 @@ def task(): return "success" pg = placement_group(name="test_pg", strategy="PACK", bundles=[{"CPU": 1}]) - assert pg.wait(timeout_seconds=10) result_ref = task.options( scheduling_strategy=PlacementGroupSchedulingStrategy( @@ -80,7 +80,7 @@ def task(): placement_group_bundle_index=0, ) ).remote() - assert ray.get(result_ref, timeout=10) == "success" + assert ray.get(result_ref) == "success" # Remove the placement group. This will trigger CancelResourceReserve RPCs which need to be blocked # for the placement group bundle to be leaked. diff --git a/src/ray/raylet/fake_worker.h b/src/ray/raylet/fake_worker.h index 3ae20ecc6db3..59943d858cc2 100644 --- a/src/ray/raylet/fake_worker.h +++ b/src/ray/raylet/fake_worker.h @@ -44,10 +44,6 @@ class FakeClientConnection { } }; -/// A fake implementation of WorkerInterface for testing. -/// This provides a minimal no-op implementation with only the essential fields needed -/// for disconnect/cleanup paths. The key feature is providing a real ClientConnection -/// to prevent segfaults in DestroyWorker callflow. class FakeWorker : public WorkerInterface { public: FakeWorker(WorkerID worker_id, int port, instrumented_io_context &io_context) @@ -62,10 +58,7 @@ class FakeWorker : public WorkerInterface { void SetOwnerAddress(const rpc::Address &address) override {} void GrantLease(const RayLease &granted_lease) override {} void GrantLeaseId(const LeaseID &lease_id) override { lease_id_ = lease_id; } - const RayLease &GetGrantedLease() const override { - static RayLease empty_lease; - return empty_lease; - } + const RayLease &GetGrantedLease() const override { return granted_lease_; } absl::Time GetGrantedLeaseTime() const override { return absl::InfiniteFuture(); } std::optional GetIsGpu() const override { return std::nullopt; } std::optional GetIsActorWorker() const override { return std::nullopt; } @@ -96,25 +89,16 @@ class FakeWorker : public WorkerInterface { int AssignedPort() const override { return -1; } void SetAssignedPort(int port) override {} const LeaseID &GetGrantedLeaseId() const override { return lease_id_; } - const JobID &GetAssignedJobId() const override { - static JobID job_id = JobID::FromInt(1); - return job_id; - } + const JobID &GetAssignedJobId() const override { return job_id_; } int GetRuntimeEnvHash() const override { return 0; } void AssignActorId(const ActorID &actor_id) override {} - const ActorID &GetActorId() const override { - static ActorID actor_id; - return actor_id; - } + const ActorID &GetActorId() const override { return actor_id_; } const std::string GetLeaseIdAsDebugString() const override { return ""; } bool IsDetachedActor() const override { return false; } const std::shared_ptr Connection() const override { return connection_; } - const rpc::Address &GetOwnerAddress() const override { - static rpc::Address address; - return address; - } + const rpc::Address &GetOwnerAddress() const override { return owner_address_; } std::optional GetSavedProcessGroupId() const override { return std::nullopt; } void SetSavedProcessGroupId(pid_t pgid) override {} void ActorCallArgWaitComplete(int64_t tag) override {} @@ -122,17 +106,13 @@ class FakeWorker : public WorkerInterface { void ClearLifetimeAllocatedInstances() override {} const BundleID &GetBundleId() const override { return bundle_id_; } void SetBundleId(const BundleID &bundle_id) override { bundle_id_ = bundle_id; } - RayLease &GetGrantedLease() override { - static RayLease empty_lease; - return empty_lease; - } + RayLease &GetGrantedLease() override { return granted_lease_; } bool IsRegistered() override { return false; } rpc::CoreWorkerClientInterface *rpc_client() override { return nullptr; } bool IsAvailableForScheduling() const override { return true; } void SetJobId(const JobID &job_id) override {} const ActorID &GetRootDetachedActorId() const override { - static ActorID root_detached_actor_id; - return root_detached_actor_id; + return root_detached_actor_id_; } protected: @@ -145,6 +125,11 @@ class FakeWorker : public WorkerInterface { BundleID bundle_id_; Process proc_; std::shared_ptr connection_; + RayLease granted_lease_; + JobID job_id_; + ActorID actor_id_; + rpc::Address owner_address_; + ActorID root_detached_actor_id_; }; } // namespace raylet diff --git a/src/ray/raylet/tests/util.h b/src/ray/raylet/tests/util.h index 92c7cf6f53d5..ee4a88d831ea 100644 --- a/src/ray/raylet/tests/util.h +++ b/src/ray/raylet/tests/util.h @@ -121,7 +121,10 @@ class MockWorker : public WorkerInterface { void AssignActorId(const ActorID &actor_id) override { RAY_CHECK(false) << "Method unused"; } - const ActorID &GetActorId() const override { return actor_id_; } + const ActorID &GetActorId() const override { + RAY_CHECK(false) << "Method unused"; + return ActorID::Nil(); + } const std::string GetLeaseIdAsDebugString() const override { RAY_CHECK(false) << "Method unused"; return ""; @@ -146,7 +149,10 @@ class MockWorker : public WorkerInterface { lifetime_allocated_instances_ = nullptr; } - const BundleID &GetBundleId() const override { return bundle_id_; } + const BundleID &GetBundleId() const override { + RAY_CHECK(false) << "Method unused"; + return bundle_id_; + } void SetBundleId(const BundleID &bundle_id) override { bundle_id_ = bundle_id; } @@ -194,7 +200,6 @@ class MockWorker : public WorkerInterface { int runtime_env_hash_; LeaseID lease_id_; JobID job_id_; - ActorID actor_id_; ActorID root_detached_actor_id_; Process proc_; std::atomic killing_ = false; From a2d53b2389646bc7cae1196315fa89e47ab5f225 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 16 Oct 2025 06:12:57 +0000 Subject: [PATCH 3/8] nits Signed-off-by: joshlee --- src/ray/raylet/fake_worker.h | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/ray/raylet/fake_worker.h b/src/ray/raylet/fake_worker.h index 59943d858cc2..03f64bd566d0 100644 --- a/src/ray/raylet/fake_worker.h +++ b/src/ray/raylet/fake_worker.h @@ -27,20 +27,14 @@ namespace raylet { class FakeClientConnection { public: - /// Create a FakeClientConnection with a dummy socket. - /// - /// \param io_context The IO context to create the socket with. - /// \return A shared pointer to a ClientConnection. static std::shared_ptr Create(instrumented_io_context &io_context) { local_stream_socket socket(io_context); return ClientConnection::Create( - /*message_handler=*/ [](std::shared_ptr, int64_t, const std::vector &) {}, - /*connection_error_handler=*/ [](std::shared_ptr, const boost::system::error_code &) {}, std::move(socket), - /*debug_label=*/"fake_worker_connection", - /*message_type_enum_names=*/{}); + "fake_worker_connection", + {}); } }; From 315511d8888eaed2f6a2b57a47754fa2adb761d5 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 16 Oct 2025 19:56:47 +0000 Subject: [PATCH 4/8] nit Signed-off-by: joshlee --- src/ray/raylet/fake_worker.h | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/ray/raylet/fake_worker.h b/src/ray/raylet/fake_worker.h index 03f64bd566d0..ee07478a9f2f 100644 --- a/src/ray/raylet/fake_worker.h +++ b/src/ray/raylet/fake_worker.h @@ -25,26 +25,23 @@ namespace ray { namespace raylet { -class FakeClientConnection { - public: - static std::shared_ptr Create(instrumented_io_context &io_context) { - local_stream_socket socket(io_context); - return ClientConnection::Create( - [](std::shared_ptr, int64_t, const std::vector &) {}, - [](std::shared_ptr, const boost::system::error_code &) {}, - std::move(socket), - "fake_worker_connection", - {}); - } -}; - class FakeWorker : public WorkerInterface { public: FakeWorker(WorkerID worker_id, int port, instrumented_io_context &io_context) : worker_id_(worker_id), port_(port), proc_(Process::CreateNewDummy()), - connection_(FakeClientConnection::Create(io_context)) {} + connection_([&io_context]() { + local_stream_socket socket(io_context); + return ClientConnection::Create( + [](std::shared_ptr, + int64_t, + const std::vector &) {}, + [](std::shared_ptr, const boost::system::error_code &) {}, + std::move(socket), + "fake_worker_connection", + {}); + }()) {} WorkerID WorkerId() const override { return worker_id_; } rpc::WorkerType GetWorkerType() const override { return rpc::WorkerType::WORKER; } From 892e527b2ada3eda7cef51cb550d0cfcd5241089 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 16 Oct 2025 22:34:54 +0000 Subject: [PATCH 5/8] Addressing comments Signed-off-by: joshlee --- .../ray/tests/test_raylet_fault_tolerance.py | 7 +++++-- src/ray/raylet/tests/node_manager_test.cc | 18 +++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index e80f17147bf6..87b99b42e50e 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -48,12 +48,15 @@ def simple_task_2(): assert ray.get([result_ref1, result_ref2]) == [0, 1] +# Bundles can be leaked if the gcs dies before the CancelResourceReserve RPCs are +# propagated to all the raylets. Since this is inherently racy, we block CancelResourceReserve RPCs +# from ever succeeding to make this test deterministic. @pytest.fixture -def inject_rpc_failures(monkeypatch, request): +def inject_rpc_failures(monkeypatch): monkeypatch.setenv( "RAY_testing_rpc_failure", "NodeManagerService.grpc_client.ReleaseUnusedBundles=1:100:0" - + ",NodeManagerService.grpc_client.CancelResourceReserve=100:100:0", + + ",NodeManagerService.grpc_client.CancelResourceReserve=-1:100:0", ) diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index b0e58e9cae9c..44817004759b 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -53,6 +53,8 @@ using ::testing::Return; namespace { +constexpr double kTestTotalCpuResource = 10.0; + class FakeLocalObjectManager : public LocalObjectManagerInterface { public: FakeLocalObjectManager( @@ -312,8 +314,8 @@ class NodeManagerTest : public ::testing::Test { NodeManagerConfig node_manager_config{}; node_manager_config.maximum_startup_concurrency = 1; node_manager_config.store_socket_name = "test_store_socket"; - node_manager_config.resource_config = - ResourceSet(absl::flat_hash_map{{"CPU", 10.0}}); + node_manager_config.resource_config = ResourceSet( + absl::flat_hash_map{{"CPU", kTestTotalCpuResource}}); core_worker_subscriber_ = std::make_unique(); mock_object_directory_ = std::make_unique(); @@ -685,8 +687,8 @@ TEST_F(NodeManagerTest, TestPinningAnObjectPendingDeletionFails) { TEST_F(NodeManagerTest, TestConsumeSyncMessage) { // Create and wrap a mock resource view sync message. syncer::ResourceViewSyncMessage payload; - payload.mutable_resources_total()->insert({"CPU", 10.0}); - payload.mutable_resources_available()->insert({"CPU", 10.0}); + payload.mutable_resources_total()->insert({"CPU", kTestTotalCpuResource}); + payload.mutable_resources_available()->insert({"CPU", kTestTotalCpuResource}); payload.mutable_labels()->insert({"label1", "value1"}); std::string serialized; @@ -705,8 +707,10 @@ TEST_F(NodeManagerTest, TestConsumeSyncMessage) { cluster_resource_scheduler_->GetClusterResourceManager().GetNodeResources( scheduling::NodeID(node_id.Binary())); EXPECT_EQ(node_resources.labels.at("label1"), "value1"); - EXPECT_EQ(node_resources.total.Get(scheduling::ResourceID("CPU")).Double(), 10.0); - EXPECT_EQ(node_resources.available.Get(scheduling::ResourceID("CPU")).Double(), 10.0); + EXPECT_EQ(node_resources.total.Get(scheduling::ResourceID("CPU")).Double(), + kTestTotalCpuResource); + EXPECT_EQ(node_resources.available.Get(scheduling::ResourceID("CPU")).Double(), + kTestTotalCpuResource); } TEST_F(NodeManagerTest, TestResizeLocalResourceInstancesSuccessful) { @@ -1072,7 +1076,7 @@ TEST_F(NodeManagerTest, TestHandleRequestWorkerLeaseIdempotent) { } TEST_F(NodeManagerTest, TestHandleRequestWorkerLeaseInfeasibleIdempotent) { - auto lease_spec = BuildLeaseSpec({{"CPU", 11}}); + auto lease_spec = BuildLeaseSpec({{"CPU", kTestTotalCpuResource + 1}}); lease_spec.GetMutableMessage() .mutable_scheduling_strategy() ->mutable_node_affinity_scheduling_strategy() From 3dc7333a36d1431545ab87bd21f9e25da3873795 Mon Sep 17 00:00:00 2001 From: joshlee Date: Mon, 20 Oct 2025 18:28:09 +0000 Subject: [PATCH 6/8] Addressing comments Signed-off-by: joshlee --- python/ray/tests/test_raylet_fault_tolerance.py | 11 +++++++---- src/ray/raylet/tests/node_manager_test.cc | 10 +++++----- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index 87b99b42e50e..a9907be8c5a1 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -52,23 +52,26 @@ def simple_task_2(): # propagated to all the raylets. Since this is inherently racy, we block CancelResourceReserve RPCs # from ever succeeding to make this test deterministic. @pytest.fixture -def inject_rpc_failures(monkeypatch): +def inject_rpc_failures(monkeypatch, request): + deterministic_failure = request.param monkeypatch.setenv( "RAY_testing_rpc_failure", - "NodeManagerService.grpc_client.ReleaseUnusedBundles=1:100:0" + "NodeManagerService.grpc_client.ReleaseUnusedBundles=1:" + + ("100:0" if deterministic_failure == "request" else "0:100") + ",NodeManagerService.grpc_client.CancelResourceReserve=-1:100:0", ) +@pytest.mark.parametrize("inject_rpc_failures", ["request", "response"], indirect=True) @pytest.mark.parametrize( "ray_start_cluster_head_with_external_redis", [{"num_cpus": 1}], indirect=True, ) def test_release_unused_bundles_idempotent( - inject_rpc_failures, ray_start_cluster_head_with_external_redis + inject_rpc_failures, + ray_start_cluster_head_with_external_redis, ): - # NOTE: Not testing response failure because the leaked bundle is cleaned up anyway cluster = ray_start_cluster_head_with_external_redis @ray.remote(num_cpus=1) diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 4723a9787dbe..6860a3248fa7 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -1318,10 +1318,10 @@ bool IsBundleRegistered(const PlacementGroupResourceManager &manager, return manager.bundle_spec_map_.contains(bundle_id); } -class ReleaseUnusedBundlesIdempotencyTest : public NodeManagerTest, - public ::testing::WithParamInterface {}; +class ReleaseUnusedBundlesRetriesTest : public NodeManagerTest, + public ::testing::WithParamInterface {}; -TEST_P(ReleaseUnusedBundlesIdempotencyTest, TestHandleReleaseUnusedBundlesIdempotency) { +TEST_P(ReleaseUnusedBundlesRetriesTest, TestHandleReleaseUnusedBundlesRetries) { // bundle_in_use: determines whether we mark a bundle as in use and it is released by // the placement group resource manager. // bundle_in_use == true: a bundle is marked as in use in the placement group resource @@ -1402,8 +1402,8 @@ TEST_P(ReleaseUnusedBundlesIdempotencyTest, TestHandleReleaseUnusedBundlesIdempo } } -INSTANTIATE_TEST_SUITE_P(ReleaseUnusedBundlesIdempotencyVariations, - ReleaseUnusedBundlesIdempotencyTest, +INSTANTIATE_TEST_SUITE_P(ReleaseUnusedBundlesRetriesVariations, + ReleaseUnusedBundlesRetriesTest, ::testing::Bool()); } // namespace ray::raylet From d3c717dac5f8edd757c581816ed0151976eed15c Mon Sep 17 00:00:00 2001 From: joshlee Date: Mon, 20 Oct 2025 20:02:47 +0000 Subject: [PATCH 7/8] Remove timeout Signed-off-by: joshlee --- python/ray/tests/test_raylet_fault_tolerance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_raylet_fault_tolerance.py b/python/ray/tests/test_raylet_fault_tolerance.py index bebc3fabae68..5aa7d5f7f7e0 100644 --- a/python/ray/tests/test_raylet_fault_tolerance.py +++ b/python/ray/tests/test_raylet_fault_tolerance.py @@ -134,7 +134,7 @@ def task(): cluster.head_node.start_gcs_server() # If the leaked bundle wasn't cleaned up, this task will hang due to resource unavailability - result = ray.get(task.remote(), timeout=30) + result = ray.get(task.remote()) assert result == "success" From c6e8ad90f24c196392ca76f6f5ee49d3bbcfc398 Mon Sep 17 00:00:00 2001 From: joshlee Date: Mon, 20 Oct 2025 20:11:22 +0000 Subject: [PATCH 8/8] nits Signed-off-by: joshlee --- src/ray/raylet/node_manager.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index eeb01a0c60f0..b53370d3da4e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -299,7 +299,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::ReleaseUnusedBundlesReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle a `DrainRaylet` request. void HandleDrainRaylet(rpc::DrainRayletRequest request, rpc::DrainRayletReply *reply, rpc::SendReplyCallback send_reply_callback) override;