Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion python/ray/tests/test_raylet_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
import ray
from ray._private.test_utils import wait_for_condition
from ray.core.generated import autoscaler_pb2
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
PlacementGroupSchedulingStrategy,
)


@pytest.mark.parametrize("deterministic_failure", ["request", "response"])
Expand Down Expand Up @@ -81,5 +85,58 @@ def node_is_dead():
wait_for_condition(node_is_dead, timeout=1)


# Bundles can be leaked if the gcs dies before the CancelResourceReserve RPCs are
# propagated to all the raylets. Since this is inherently racy, we block CancelResourceReserve RPCs
# from ever succeeding to make this test deterministic.
@pytest.fixture
def inject_rpc_failures(monkeypatch, request):
deterministic_failure = request.param
monkeypatch.setenv(
"RAY_testing_rpc_failure",
"NodeManagerService.grpc_client.ReleaseUnusedBundles=1:"
+ ("100:0" if deterministic_failure == "request" else "0:100")
+ ",NodeManagerService.grpc_client.CancelResourceReserve=-1:100:0",
)


@pytest.mark.parametrize("inject_rpc_failures", ["request", "response"], indirect=True)
@pytest.mark.parametrize(
"ray_start_cluster_head_with_external_redis",
[{"num_cpus": 1}],
indirect=True,
)
def test_release_unused_bundles_idempotent(
inject_rpc_failures,
ray_start_cluster_head_with_external_redis,
):
cluster = ray_start_cluster_head_with_external_redis

@ray.remote(num_cpus=1)
def task():
return "success"

pg = placement_group(name="test_pg", strategy="PACK", bundles=[{"CPU": 1}])

result_ref = task.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=0,
)
).remote()
assert ray.get(result_ref) == "success"

# Remove the placement group. This will trigger CancelResourceReserve RPCs which need to be blocked
# for the placement group bundle to be leaked.
remove_placement_group(pg)

cluster.head_node.kill_gcs_server()
# ReleaseUnusedBundles only triggers after GCS restart to clean up potentially leaked bundles.
cluster.head_node.start_gcs_server()

# If the leaked bundle wasn't cleaned up, this task will hang due to resource unavailability
result = ray.get(task.remote())
assert result == "success"


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 1 addition & 1 deletion src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ service NodeManagerService {
// that may be leaked. When GCS restarts, it doesn't know which bundles it has leased
// in the previous lifecycle. In this case, GCS will send a list of bundles that
// are still needed. And Raylet will release other bundles.
// TODO: Need to handle network failure.
// Failure: Retries, it's idempotent.
rpc ReleaseUnusedBundles(ReleaseUnusedBundlesRequest)
returns (ReleaseUnusedBundlesReply);
// Get the system config.
Expand Down
10 changes: 10 additions & 0 deletions src/ray/raylet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ ray_cc_library(
],
)

ray_cc_library(
name = "fake_worker",
hdrs = ["fake_worker.h"],
visibility = [":__subpackages__"],
deps = [
":worker",
"//src/ray/raylet_ipc_client:client_connection",
],
)

ray_cc_library(
name = "worker_pool",
srcs = ["worker_pool.cc"],
Expand Down
127 changes: 127 additions & 0 deletions src/ray/raylet/fake_worker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "ray/raylet/worker.h"
#include "ray/raylet_ipc_client/client_connection.h"

namespace ray {
namespace raylet {

class FakeWorker : public WorkerInterface {
public:
FakeWorker(WorkerID worker_id, int port, instrumented_io_context &io_context)
: worker_id_(worker_id),
port_(port),
proc_(Process::CreateNewDummy()),
connection_([&io_context]() {
local_stream_socket socket(io_context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we creating a real socket in a fake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup I am, seems a bit of a pain to rework it not to take in a real socket so just passed real socket + dummy lambdas as a compromise

return ClientConnection::Create(
[](std::shared_ptr<ClientConnection>,
int64_t,
const std::vector<uint8_t> &) {},
[](std::shared_ptr<ClientConnection>, const boost::system::error_code &) {},
std::move(socket),
"fake_worker_connection",
{});
}()) {}

WorkerID WorkerId() const override { return worker_id_; }
rpc::WorkerType GetWorkerType() const override { return rpc::WorkerType::WORKER; }
int Port() const override { return port_; }
void SetOwnerAddress(const rpc::Address &address) override {}
void GrantLease(const RayLease &granted_lease) override {}
void GrantLeaseId(const LeaseID &lease_id) override { lease_id_ = lease_id; }
const RayLease &GetGrantedLease() const override { return granted_lease_; }
absl::Time GetGrantedLeaseTime() const override { return absl::InfiniteFuture(); }
std::optional<bool> GetIsGpu() const override { return std::nullopt; }
std::optional<bool> GetIsActorWorker() const override { return std::nullopt; }
const std::string IpAddress() const override { return "127.0.0.1"; }
void AsyncNotifyGCSRestart() override {}
void SetAllocatedInstances(
const std::shared_ptr<TaskResourceInstances> &allocated_instances) override {}
void SetLifetimeAllocatedInstances(
const std::shared_ptr<TaskResourceInstances> &allocated_instances) override {}
std::shared_ptr<TaskResourceInstances> GetAllocatedInstances() override {
return nullptr;
}
std::shared_ptr<TaskResourceInstances> GetLifetimeAllocatedInstances() override {
return nullptr;
}
void MarkDead() override {}
bool IsDead() const override { return false; }
void KillAsync(instrumented_io_context &io_service, bool force) override {}
void MarkBlocked() override {}
void MarkUnblocked() override {}
bool IsBlocked() const override { return false; }
Process GetProcess() const override { return proc_; }
StartupToken GetStartupToken() const override { return 0; }
void SetProcess(Process proc) override {}
Language GetLanguage() const override { return Language::PYTHON; }
void Connect(int port) override {}
void Connect(std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client) override {}
int AssignedPort() const override { return -1; }
void SetAssignedPort(int port) override {}
const LeaseID &GetGrantedLeaseId() const override { return lease_id_; }
const JobID &GetAssignedJobId() const override { return job_id_; }
int GetRuntimeEnvHash() const override { return 0; }
void AssignActorId(const ActorID &actor_id) override {}
const ActorID &GetActorId() const override { return actor_id_; }
const std::string GetLeaseIdAsDebugString() const override { return ""; }
bool IsDetachedActor() const override { return false; }
const std::shared_ptr<ClientConnection> Connection() const override {
return connection_;
}
const rpc::Address &GetOwnerAddress() const override { return owner_address_; }
std::optional<pid_t> GetSavedProcessGroupId() const override { return std::nullopt; }
void SetSavedProcessGroupId(pid_t pgid) override {}
void ActorCallArgWaitComplete(int64_t tag) override {}
void ClearAllocatedInstances() override {}
void ClearLifetimeAllocatedInstances() override {}
const BundleID &GetBundleId() const override { return bundle_id_; }
void SetBundleId(const BundleID &bundle_id) override { bundle_id_ = bundle_id; }
RayLease &GetGrantedLease() override { return granted_lease_; }
bool IsRegistered() override { return false; }
rpc::CoreWorkerClientInterface *rpc_client() override { return nullptr; }
bool IsAvailableForScheduling() const override { return true; }
void SetJobId(const JobID &job_id) override {}
const ActorID &GetRootDetachedActorId() const override {
return root_detached_actor_id_;
}

protected:
void SetStartupToken(StartupToken startup_token) override {}

private:
WorkerID worker_id_;
int port_;
LeaseID lease_id_;
BundleID bundle_id_;
Process proc_;
std::shared_ptr<ClientConnection> connection_;
RayLease granted_lease_;
JobID job_id_;
ActorID actor_id_;
rpc::Address owner_address_;
ActorID root_detached_actor_id_;
};

} // namespace raylet
} // namespace ray
8 changes: 8 additions & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ int main(int argc, char *argv[]) {

ray::stats::Gauge task_by_state_counter = ray::core::GetTaskByStateGaugeMetric();
std::shared_ptr<plasma::PlasmaClient> plasma_client;
std::unique_ptr<ray::raylet::PlacementGroupResourceManager>
placement_group_resource_manager;
std::unique_ptr<ray::raylet::NodeManager> node_manager;
std::unique_ptr<ray::rpc::ClientCallManager> client_call_manager;
std::unique_ptr<ray::rpc::CoreWorkerClientPool> worker_rpc_pool;
Expand Down Expand Up @@ -916,6 +918,11 @@ int main(int argc, char *argv[]) {
main_service, ray::ParseUrlEndpoint(raylet_socket_name));
ray::local_stream_socket socket(main_service);
ray::SetCloseOnExec(acceptor);

placement_group_resource_manager =
std::make_unique<ray::raylet::NewPlacementGroupResourceManager>(
*cluster_resource_scheduler);

node_manager = std::make_unique<ray::raylet::NodeManager>(
main_service,
raylet_node_id,
Expand Down Expand Up @@ -944,6 +951,7 @@ int main(int argc, char *argv[]) {
std::move(add_process_to_system_cgroup_hook),
std::move(cgroup_manager),
shutting_down,
*placement_group_resource_manager,
std::move(acceptor),
std::move(socket));

Expand Down
13 changes: 6 additions & 7 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ NodeManager::NodeManager(
AddProcessToCgroupHook add_process_to_system_cgroup_hook,
std::unique_ptr<CgroupManagerInterface> cgroup_manager,
std::atomic_bool &shutting_down,
PlacementGroupResourceManager &placement_group_resource_manager,
boost::asio::basic_socket_acceptor<local_stream_protocol> acceptor,
local_stream_socket socket)
: self_node_id_(self_node_id),
Expand Down Expand Up @@ -221,6 +222,7 @@ NodeManager::NodeManager(
local_lease_manager_(local_lease_manager),
cluster_lease_manager_(cluster_lease_manager),
record_metrics_period_ms_(config.record_metrics_period_ms),
placement_group_resource_manager_(placement_group_resource_manager),
next_resource_seq_no_(0),
ray_syncer_(io_service_, self_node_id_.Binary()),
worker_killing_policy_(std::make_shared<GroupByOwnerIdWorkerKillingPolicy>()),
Expand All @@ -237,9 +239,6 @@ NodeManager::NodeManager(
socket_(std::move(socket)) {
RAY_LOG(INFO).WithField(kLogKeyNodeID, self_node_id_) << "Initializing NodeManager";

placement_group_resource_manager_ =
std::make_unique<NewPlacementGroupResourceManager>(cluster_resource_scheduler_);

periodical_runner_->RunFnPeriodically(
[this]() { cluster_lease_manager_.ScheduleAndGrantLeases(); },
RayConfig::instance().worker_cap_initial_backoff_delay_ms(),
Expand Down Expand Up @@ -674,7 +673,7 @@ void NodeManager::HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest re
}

// Return unused bundle resources.
placement_group_resource_manager_->ReturnUnusedBundle(in_use_bundles);
placement_group_resource_manager_.ReturnUnusedBundle(in_use_bundles);

send_reply_callback(Status::OK(), nullptr, nullptr);
}
Expand Down Expand Up @@ -1927,7 +1926,7 @@ void NodeManager::HandlePrepareBundleResources(
}
RAY_LOG(DEBUG) << "Request to prepare resources for bundles: "
<< GetDebugStringForBundles(bundle_specs);
auto prepared = placement_group_resource_manager_->PrepareBundles(bundle_specs);
auto prepared = placement_group_resource_manager_.PrepareBundles(bundle_specs);
reply->set_success(prepared);
send_reply_callback(Status::OK(), nullptr, nullptr);
}
Expand All @@ -1943,7 +1942,7 @@ void NodeManager::HandleCommitBundleResources(
}
RAY_LOG(DEBUG) << "Request to commit resources for bundles: "
<< GetDebugStringForBundles(bundle_specs);
placement_group_resource_manager_->CommitBundles(bundle_specs);
placement_group_resource_manager_.CommitBundles(bundle_specs);
send_reply_callback(Status::OK(), nullptr, nullptr);

cluster_lease_manager_.ScheduleAndGrantLeases();
Expand Down Expand Up @@ -1997,7 +1996,7 @@ void NodeManager::HandleCancelResourceReserve(
DestroyWorker(worker, rpc::WorkerExitType::INTENDED_SYSTEM_EXIT, message);
}

RAY_CHECK_OK(placement_group_resource_manager_->ReturnBundle(bundle_spec));
RAY_CHECK_OK(placement_group_resource_manager_.ReturnBundle(bundle_spec));
cluster_lease_manager_.ScheduleAndGrantLeases();
send_reply_callback(Status::OK(), nullptr, nullptr);
}
Expand Down
12 changes: 6 additions & 6 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
AddProcessToCgroupHook add_process_to_system_cgroup_hook,
std::unique_ptr<CgroupManagerInterface> cgroup_manager,
std::atomic_bool &shutting_down,
PlacementGroupResourceManager &placement_group_resource_manager,
boost::asio::basic_socket_acceptor<local_stream_protocol> acceptor,
local_stream_socket socket);

Expand Down Expand Up @@ -294,6 +295,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest request,
rpc::ReleaseUnusedBundlesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleDrainRaylet(rpc::DrainRayletRequest request,
rpc::DrainRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
Expand Down Expand Up @@ -615,11 +620,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::FormatGlobalMemoryInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `ReleaseUnusedBundles` request.
void HandleReleaseUnusedBundles(rpc::ReleaseUnusedBundlesRequest request,
rpc::ReleaseUnusedBundlesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `GetSystemConfig` request.
void HandleGetSystemConfig(rpc::GetSystemConfigRequest request,
rpc::GetSystemConfigReply *reply,
Expand Down Expand Up @@ -874,7 +874,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
uint64_t number_workers_killed_ = 0;

/// Managers all bundle-related operations.
std::unique_ptr<PlacementGroupResourceManager> placement_group_resource_manager_;
PlacementGroupResourceManager &placement_group_resource_manager_;

/// Next resource broadcast seq no. Non-incrementing sequence numbers
/// indicate network issues (dropped/duplicated/ooo packets, etc).
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/placement_group_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class PlacementGroupResourceManager {
/// Save `BundleSpecification` for cleaning leaked bundles after GCS restart.
absl::flat_hash_map<BundleID, std::shared_ptr<BundleSpecification>, pair_hash>
bundle_spec_map_;

friend bool IsBundleRegistered(const PlacementGroupResourceManager &manager,
const BundleID &bundle_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀
no way around? + is it necessary to assert on this

Copy link
Contributor Author

@Sparks0219 Sparks0219 Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm I think we need some way of looking into pgmanager state since ReleaseUnusedBundles is calling pgmanager methods and I don't think there's anything I can use for this in the class currently :(

};

/// Associated with new scheduler.
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ ray_cc_test(
"//src/ray/object_manager/plasma:plasma_client",
"//src/ray/observability:fake_metric",
"//src/ray/pubsub:fake_subscriber",
"//src/ray/raylet:fake_worker",
"//src/ray/raylet:local_object_manager_interface",
"//src/ray/raylet:raylet_lib",
"//src/ray/raylet/scheduling:cluster_lease_manager",
Expand Down
Loading