Skip to content

Commit 8b47af6

Browse files
Sparks0219xinyuangui2
authored andcommitted
[core] Make ReleaseUnusedBundles Fault Tolerant (ray-project#57786)
Signed-off-by: joshlee <joshlee@anyscale.com> Signed-off-by: xgui <xgui@anyscale.com>
1 parent 423032c commit 8b47af6

File tree

11 files changed

+335
-21
lines changed

11 files changed

+335
-21
lines changed

python/ray/tests/test_raylet_fault_tolerance.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
import ray
66
from ray._private.test_utils import wait_for_condition
77
from ray.core.generated import autoscaler_pb2
8-
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
8+
from ray.util.placement_group import placement_group, remove_placement_group
9+
from ray.util.scheduling_strategies import (
10+
NodeAffinitySchedulingStrategy,
11+
PlacementGroupSchedulingStrategy,
12+
)
913

1014

1115
@pytest.mark.parametrize("deterministic_failure", ["request", "response"])
@@ -81,5 +85,58 @@ def node_is_dead():
8185
wait_for_condition(node_is_dead, timeout=1)
8286

8387

88+
# Bundles can be leaked if the gcs dies before the CancelResourceReserve RPCs are
89+
# propagated to all the raylets. Since this is inherently racy, we block CancelResourceReserve RPCs
90+
# from ever succeeding to make this test deterministic.
91+
@pytest.fixture
92+
def inject_rpc_failures(monkeypatch, request):
93+
deterministic_failure = request.param
94+
monkeypatch.setenv(
95+
"RAY_testing_rpc_failure",
96+
"NodeManagerService.grpc_client.ReleaseUnusedBundles=1:"
97+
+ ("100:0" if deterministic_failure == "request" else "0:100")
98+
+ ",NodeManagerService.grpc_client.CancelResourceReserve=-1:100:0",
99+
)
100+
101+
102+
@pytest.mark.parametrize("inject_rpc_failures", ["request", "response"], indirect=True)
103+
@pytest.mark.parametrize(
104+
"ray_start_cluster_head_with_external_redis",
105+
[{"num_cpus": 1}],
106+
indirect=True,
107+
)
108+
def test_release_unused_bundles_idempotent(
109+
inject_rpc_failures,
110+
ray_start_cluster_head_with_external_redis,
111+
):
112+
cluster = ray_start_cluster_head_with_external_redis
113+
114+
@ray.remote(num_cpus=1)
115+
def task():
116+
return "success"
117+
118+
pg = placement_group(name="test_pg", strategy="PACK", bundles=[{"CPU": 1}])
119+
120+
result_ref = task.options(
121+
scheduling_strategy=PlacementGroupSchedulingStrategy(
122+
placement_group=pg,
123+
placement_group_bundle_index=0,
124+
)
125+
).remote()
126+
assert ray.get(result_ref) == "success"
127+
128+
# Remove the placement group. This will trigger CancelResourceReserve RPCs which need to be blocked
129+
# for the placement group bundle to be leaked.
130+
remove_placement_group(pg)
131+
132+
cluster.head_node.kill_gcs_server()
133+
# ReleaseUnusedBundles only triggers after GCS restart to clean up potentially leaked bundles.
134+
cluster.head_node.start_gcs_server()
135+
136+
# If the leaked bundle wasn't cleaned up, this task will hang due to resource unavailability
137+
result = ray.get(task.remote())
138+
assert result == "success"
139+
140+
84141
if __name__ == "__main__":
85142
sys.exit(pytest.main(["-sv", __file__]))

src/ray/protobuf/node_manager.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ service NodeManagerService {
490490
// that may be leaked. When GCS restarts, it doesn't know which bundles it has leased
491491
// in the previous lifecycle. In this case, GCS will send a list of bundles that
492492
// are still needed. And Raylet will release other bundles.
493-
// TODO: Need to handle network failure.
493+
// Failure: Retries, it's idempotent.
494494
rpc ReleaseUnusedBundles(ReleaseUnusedBundlesRequest)
495495
returns (ReleaseUnusedBundlesReply);
496496
// Get the system config.

src/ray/raylet/BUILD.bazel

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ ray_cc_library(
110110
],
111111
)
112112

113+
ray_cc_library(
114+
name = "fake_worker",
115+
hdrs = ["fake_worker.h"],
116+
visibility = [":__subpackages__"],
117+
deps = [
118+
":worker",
119+
"//src/ray/raylet_ipc_client:client_connection",
120+
],
121+
)
122+
113123
ray_cc_library(
114124
name = "worker_pool",
115125
srcs = ["worker_pool.cc"],

src/ray/raylet/fake_worker.h

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright 2025 The Ray Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <memory>
18+
#include <string>
19+
#include <utility>
20+
#include <vector>
21+
22+
#include "ray/raylet/worker.h"
23+
#include "ray/raylet_ipc_client/client_connection.h"
24+
25+
namespace ray {
26+
namespace raylet {
27+
28+
class FakeWorker : public WorkerInterface {
29+
public:
30+
FakeWorker(WorkerID worker_id, int port, instrumented_io_context &io_context)
31+
: worker_id_(worker_id),
32+
port_(port),
33+
proc_(Process::CreateNewDummy()),
34+
connection_([&io_context]() {
35+
local_stream_socket socket(io_context);
36+
return ClientConnection::Create(
37+
[](std::shared_ptr<ClientConnection>,
38+
int64_t,
39+
const std::vector<uint8_t> &) {},
40+
[](std::shared_ptr<ClientConnection>, const boost::system::error_code &) {},
41+
std::move(socket),
42+
"fake_worker_connection",
43+
{});
44+
}()) {}
45+
46+
WorkerID WorkerId() const override { return worker_id_; }
47+
rpc::WorkerType GetWorkerType() const override { return rpc::WorkerType::WORKER; }
48+
int Port() const override { return port_; }
49+
void SetOwnerAddress(const rpc::Address &address) override {}
50+
void GrantLease(const RayLease &granted_lease) override {}
51+
void GrantLeaseId(const LeaseID &lease_id) override { lease_id_ = lease_id; }
52+
const RayLease &GetGrantedLease() const override { return granted_lease_; }
53+
absl::Time GetGrantedLeaseTime() const override { return absl::InfiniteFuture(); }
54+
std::optional<bool> GetIsGpu() const override { return std::nullopt; }
55+
std::optional<bool> GetIsActorWorker() const override { return std::nullopt; }
56+
const std::string IpAddress() const override { return "127.0.0.1"; }
57+
void AsyncNotifyGCSRestart() override {}
58+
void SetAllocatedInstances(
59+
const std::shared_ptr<TaskResourceInstances> &allocated_instances) override {}
60+
void SetLifetimeAllocatedInstances(
61+
const std::shared_ptr<TaskResourceInstances> &allocated_instances) override {}
62+
std::shared_ptr<TaskResourceInstances> GetAllocatedInstances() override {
63+
return nullptr;
64+
}
65+
std::shared_ptr<TaskResourceInstances> GetLifetimeAllocatedInstances() override {
66+
return nullptr;
67+
}
68+
void MarkDead() override {}
69+
bool IsDead() const override { return false; }
70+
void KillAsync(instrumented_io_context &io_service, bool force) override {}
71+
void MarkBlocked() override {}
72+
void MarkUnblocked() override {}
73+
bool IsBlocked() const override { return false; }
74+
Process GetProcess() const override { return proc_; }
75+
StartupToken GetStartupToken() const override { return 0; }
76+
void SetProcess(Process proc) override {}
77+
Language GetLanguage() const override { return Language::PYTHON; }
78+
void Connect(int port) override {}
79+
void Connect(std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client) override {}
80+
int AssignedPort() const override { return -1; }
81+
void SetAssignedPort(int port) override {}
82+
const LeaseID &GetGrantedLeaseId() const override { return lease_id_; }
83+
const JobID &GetAssignedJobId() const override { return job_id_; }
84+
int GetRuntimeEnvHash() const override { return 0; }
85+
void AssignActorId(const ActorID &actor_id) override {}
86+
const ActorID &GetActorId() const override { return actor_id_; }
87+
const std::string GetLeaseIdAsDebugString() const override { return ""; }
88+
bool IsDetachedActor() const override { return false; }
89+
const std::shared_ptr<ClientConnection> Connection() const override {
90+
return connection_;
91+
}
92+
const rpc::Address &GetOwnerAddress() const override { return owner_address_; }
93+
std::optional<pid_t> GetSavedProcessGroupId() const override { return std::nullopt; }
94+
void SetSavedProcessGroupId(pid_t pgid) override {}
95+
void ActorCallArgWaitComplete(int64_t tag) override {}
96+
void ClearAllocatedInstances() override {}
97+
void ClearLifetimeAllocatedInstances() override {}
98+
const BundleID &GetBundleId() const override { return bundle_id_; }
99+
void SetBundleId(const BundleID &bundle_id) override { bundle_id_ = bundle_id; }
100+
RayLease &GetGrantedLease() override { return granted_lease_; }
101+
bool IsRegistered() override { return false; }
102+
rpc::CoreWorkerClientInterface *rpc_client() override { return nullptr; }
103+
bool IsAvailableForScheduling() const override { return true; }
104+
void SetJobId(const JobID &job_id) override {}
105+
const ActorID &GetRootDetachedActorId() const override {
106+
return root_detached_actor_id_;
107+
}
108+
109+
protected:
110+
void SetStartupToken(StartupToken startup_token) override {}
111+
112+
private:
113+
WorkerID worker_id_;
114+
int port_;
115+
LeaseID lease_id_;
116+
BundleID bundle_id_;
117+
Process proc_;
118+
std::shared_ptr<ClientConnection> connection_;
119+
RayLease granted_lease_;
120+
JobID job_id_;
121+
ActorID actor_id_;
122+
rpc::Address owner_address_;
123+
ActorID root_detached_actor_id_;
124+
};
125+
126+
} // namespace raylet
127+
} // namespace ray

src/ray/raylet/main.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,8 @@ int main(int argc, char *argv[]) {
330330

331331
ray::stats::Gauge task_by_state_counter = ray::core::GetTaskByStateGaugeMetric();
332332
std::shared_ptr<plasma::PlasmaClient> plasma_client;
333+
std::unique_ptr<ray::raylet::PlacementGroupResourceManager>
334+
placement_group_resource_manager;
333335
std::unique_ptr<ray::raylet::NodeManager> node_manager;
334336
std::unique_ptr<ray::rpc::ClientCallManager> client_call_manager;
335337
std::unique_ptr<ray::rpc::CoreWorkerClientPool> worker_rpc_pool;
@@ -916,6 +918,11 @@ int main(int argc, char *argv[]) {
916918
main_service, ray::ParseUrlEndpoint(raylet_socket_name));
917919
ray::local_stream_socket socket(main_service);
918920
ray::SetCloseOnExec(acceptor);
921+
922+
placement_group_resource_manager =
923+
std::make_unique<ray::raylet::NewPlacementGroupResourceManager>(
924+
*cluster_resource_scheduler);
925+
919926
node_manager = std::make_unique<ray::raylet::NodeManager>(
920927
main_service,
921928
raylet_node_id,
@@ -944,6 +951,7 @@ int main(int argc, char *argv[]) {
944951
std::move(add_process_to_system_cgroup_hook),
945952
std::move(cgroup_manager),
946953
shutting_down,
954+
*placement_group_resource_manager,
947955
std::move(acceptor),
948956
std::move(socket));
949957

src/ray/raylet/node_manager.cc

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ NodeManager::NodeManager(
177177
AddProcessToCgroupHook add_process_to_system_cgroup_hook,
178178
std::unique_ptr<CgroupManagerInterface> cgroup_manager,
179179
std::atomic_bool &shutting_down,
180+
PlacementGroupResourceManager &placement_group_resource_manager,
180181
boost::asio::basic_socket_acceptor<local_stream_protocol> acceptor,
181182
local_stream_socket socket)
182183
: self_node_id_(self_node_id),
@@ -221,6 +222,7 @@ NodeManager::NodeManager(
221222
local_lease_manager_(local_lease_manager),
222223
cluster_lease_manager_(cluster_lease_manager),
223224
record_metrics_period_ms_(config.record_metrics_period_ms),
225+
placement_group_resource_manager_(placement_group_resource_manager),
224226
next_resource_seq_no_(0),
225227
ray_syncer_(io_service_, self_node_id_.Binary()),
226228
worker_killing_policy_(std::make_shared<GroupByOwnerIdWorkerKillingPolicy>()),
@@ -237,9 +239,6 @@ NodeManager::NodeManager(
237239
socket_(std::move(socket)) {
238240
RAY_LOG(INFO).WithField(kLogKeyNodeID, self_node_id_) << "Initializing NodeManager";
239241

240-
placement_group_resource_manager_ =
241-
std::make_unique<NewPlacementGroupResourceManager>(cluster_resource_scheduler_);
242-
243242
periodical_runner_->RunFnPeriodically(
244243
[this]() { cluster_lease_manager_.ScheduleAndGrantLeases(); },
245244
RayConfig::instance().worker_cap_initial_backoff_delay_ms(),
@@ -674,7 +673,7 @@ void NodeManager::HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest re
674673
}
675674

676675
// Return unused bundle resources.
677-
placement_group_resource_manager_->ReturnUnusedBundle(in_use_bundles);
676+
placement_group_resource_manager_.ReturnUnusedBundle(in_use_bundles);
678677

679678
send_reply_callback(Status::OK(), nullptr, nullptr);
680679
}
@@ -1927,7 +1926,7 @@ void NodeManager::HandlePrepareBundleResources(
19271926
}
19281927
RAY_LOG(DEBUG) << "Request to prepare resources for bundles: "
19291928
<< GetDebugStringForBundles(bundle_specs);
1930-
auto prepared = placement_group_resource_manager_->PrepareBundles(bundle_specs);
1929+
auto prepared = placement_group_resource_manager_.PrepareBundles(bundle_specs);
19311930
reply->set_success(prepared);
19321931
send_reply_callback(Status::OK(), nullptr, nullptr);
19331932
}
@@ -1943,7 +1942,7 @@ void NodeManager::HandleCommitBundleResources(
19431942
}
19441943
RAY_LOG(DEBUG) << "Request to commit resources for bundles: "
19451944
<< GetDebugStringForBundles(bundle_specs);
1946-
placement_group_resource_manager_->CommitBundles(bundle_specs);
1945+
placement_group_resource_manager_.CommitBundles(bundle_specs);
19471946
send_reply_callback(Status::OK(), nullptr, nullptr);
19481947

19491948
cluster_lease_manager_.ScheduleAndGrantLeases();
@@ -1997,7 +1996,7 @@ void NodeManager::HandleCancelResourceReserve(
19971996
DestroyWorker(worker, rpc::WorkerExitType::INTENDED_SYSTEM_EXIT, message);
19981997
}
19991998

2000-
RAY_CHECK_OK(placement_group_resource_manager_->ReturnBundle(bundle_spec));
1999+
RAY_CHECK_OK(placement_group_resource_manager_.ReturnBundle(bundle_spec));
20012000
cluster_lease_manager_.ScheduleAndGrantLeases();
20022001
send_reply_callback(Status::OK(), nullptr, nullptr);
20032002
}

src/ray/raylet/node_manager.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
164164
AddProcessToCgroupHook add_process_to_system_cgroup_hook,
165165
std::unique_ptr<CgroupManagerInterface> cgroup_manager,
166166
std::atomic_bool &shutting_down,
167+
PlacementGroupResourceManager &placement_group_resource_manager,
167168
boost::asio::basic_socket_acceptor<local_stream_protocol> acceptor,
168169
local_stream_socket socket);
169170

@@ -294,6 +295,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
294295
rpc::CancelWorkerLeaseReply *reply,
295296
rpc::SendReplyCallback send_reply_callback) override;
296297

298+
void HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest request,
299+
rpc::ReleaseUnusedBundlesReply *reply,
300+
rpc::SendReplyCallback send_reply_callback) override;
301+
297302
void HandleDrainRaylet(rpc::DrainRayletRequest request,
298303
rpc::DrainRayletReply *reply,
299304
rpc::SendReplyCallback send_reply_callback) override;
@@ -615,11 +620,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
615620
rpc::FormatGlobalMemoryInfoReply *reply,
616621
rpc::SendReplyCallback send_reply_callback) override;
617622

618-
/// Handle a `ReleaseUnusedBundles` request.
619-
void HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest request,
620-
rpc::ReleaseUnusedBundlesReply *reply,
621-
rpc::SendReplyCallback send_reply_callback) override;
622-
623623
/// Handle a `GetSystemConfig` request.
624624
void HandleGetSystemConfig(rpc::GetSystemConfigRequest request,
625625
rpc::GetSystemConfigReply *reply,
@@ -874,7 +874,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
874874
uint64_t number_workers_killed_ = 0;
875875

876876
/// Managers all bundle-related operations.
877-
std::unique_ptr<PlacementGroupResourceManager> placement_group_resource_manager_;
877+
PlacementGroupResourceManager &placement_group_resource_manager_;
878878

879879
/// Next resource broadcast seq no. Non-incrementing sequence numbers
880880
/// indicate network issues (dropped/duplicated/ooo packets, etc).

src/ray/raylet/placement_group_resource_manager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ class PlacementGroupResourceManager {
8383
/// Save `BundleSpecification` for cleaning leaked bundles after GCS restart.
8484
absl::flat_hash_map<BundleID, std::shared_ptr<BundleSpecification>, pair_hash>
8585
bundle_spec_map_;
86+
87+
friend bool IsBundleRegistered(const PlacementGroupResourceManager &manager,
88+
const BundleID &bundle_id);
8689
};
8790

8891
/// Associated with new scheduler.

src/ray/raylet/tests/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ ray_cc_test(
162162
"//src/ray/object_manager/plasma:plasma_client",
163163
"//src/ray/observability:fake_metric",
164164
"//src/ray/pubsub:fake_subscriber",
165+
"//src/ray/raylet:fake_worker",
165166
"//src/ray/raylet:local_object_manager_interface",
166167
"//src/ray/raylet:raylet_lib",
167168
"//src/ray/raylet/scheduling:cluster_lease_manager",

0 commit comments

Comments
 (0)