Skip to content

Commit 11d7ad4

Browse files
authored
[core] remove cluster_full_of_actors_detected_* as autoscaler v2 doesn't use them (#59052)
This PR removes the `cluster_full_of_actors_detected` and `cluster_full_of_actors_detected_by_gcs` fields from the protobuf, as they are not used in autoscaler v2, and autoscaler v1 is scheduled for deletion soon. The 2 fields are considered private, so they are deleted without maintaining backward compatibility. Signed-off-by: Rueian <rueiancsie@gmail.com>
1 parent cbfbfa5 commit 11d7ad4

File tree

12 files changed

+10
-167
lines changed

12 files changed

+10
-167
lines changed

python/ray/autoscaler/_private/autoscaler.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,6 @@ def _report_pending_infeasible(self, unfulfilled: List[ResourceDict]):
817817
"""
818818
# For type checking, assert that this object has been instantitiated.
819819
assert self.resource_demand_scheduler
820-
pending = []
821820
infeasible = []
822821
for bundle in unfulfilled:
823822
placement_group = any(
@@ -826,22 +825,8 @@ def _report_pending_infeasible(self, unfulfilled: List[ResourceDict]):
826825
)
827826
if placement_group:
828827
continue
829-
if self.resource_demand_scheduler.is_feasible(bundle):
830-
pending.append(bundle)
831-
else:
828+
if not self.resource_demand_scheduler.is_feasible(bundle):
832829
infeasible.append(bundle)
833-
if pending:
834-
if self.load_metrics.cluster_full_of_actors_detected:
835-
for request in pending:
836-
self.event_summarizer.add_once_per_interval(
837-
"Warning: The following resource request cannot be "
838-
"scheduled right now: {}. This is likely due to all "
839-
"cluster resources being claimed by actors. Consider "
840-
"creating fewer actors or adding more nodes "
841-
"to this Ray cluster.".format(request),
842-
key="pending_{}".format(sorted(request.items())),
843-
interval_s=30,
844-
)
845830
if infeasible:
846831
for request in infeasible:
847832
self.event_summarizer.add_once_per_interval(

python/ray/autoscaler/_private/load_metrics.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ def __init__(self):
7878
self.infeasible_bundles = []
7979
self.pending_placement_groups = []
8080
self.resource_requests = []
81-
self.cluster_full_of_actors_detected = False
8281
self.ray_nodes_last_used_time_by_ip = {}
8382

8483
def __bool__(self):
@@ -97,11 +96,9 @@ def update(
9796
waiting_bundles: List[Dict[str, float]] = None,
9897
infeasible_bundles: List[Dict[str, float]] = None,
9998
pending_placement_groups: List[PlacementGroupTableData] = None,
100-
cluster_full_of_actors_detected: bool = False,
10199
):
102100
self.static_resources_by_ip[ip] = static_resources
103101
self.node_id_by_ip[ip] = node_id
104-
self.cluster_full_of_actors_detected = cluster_full_of_actors_detected
105102

106103
if not waiting_bundles:
107104
waiting_bundles = []

python/ray/autoscaler/_private/monitor.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -268,19 +268,6 @@ def update_load_metrics(self):
268268
self.autoscaler.provider._set_nodes(new_nodes)
269269

270270
mirror_node_types = {}
271-
legacy_cluster_full_detected = any(
272-
getattr(entry, "cluster_full_of_actors_detected", False)
273-
for entry in resources_batch_data.batch
274-
)
275-
cluster_full = legacy_cluster_full_detected or getattr(
276-
response, "cluster_full_of_actors_detected_by_gcs", False
277-
)
278-
if (
279-
hasattr(response, "cluster_full_of_actors_detected_by_gcs")
280-
and response.cluster_full_of_actors_detected_by_gcs
281-
):
282-
# GCS has detected the cluster full of actors.
283-
cluster_full = True
284271
for resource_message in cluster_resource_state.node_states:
285272
node_id = resource_message.node_id
286273
# Generate node type config based on GCS reported node list.
@@ -341,7 +328,6 @@ def update_load_metrics(self):
341328
waiting_bundles,
342329
infeasible_bundles,
343330
pending_placement_groups,
344-
cluster_full,
345331
)
346332
if self.readonly_config:
347333
self.readonly_config["available_node_types"].update(mirror_node_types)

python/ray/tests/test_output.py

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
run_string_as_driver_nonblocking,
1515
run_string_as_driver_stdout_stderr,
1616
)
17-
from ray.autoscaler.v2.utils import is_autoscaler_v2
1817

1918

2019
def test_dedup_logs():
@@ -221,61 +220,6 @@ def _check_for_infeasible_msg():
221220
proc.wait()
222221

223222

224-
@pytest.mark.parametrize(
225-
"ray_start_cluster_head_with_env_vars",
226-
[
227-
{
228-
"num_cpus": 1,
229-
"env_vars": {
230-
"RAY_enable_autoscaler_v2": "0",
231-
"RAY_debug_dump_period_milliseconds": "1000",
232-
},
233-
},
234-
{
235-
"num_cpus": 1,
236-
"env_vars": {
237-
"RAY_enable_autoscaler_v2": "1",
238-
"RAY_debug_dump_period_milliseconds": "1000",
239-
},
240-
},
241-
],
242-
indirect=True,
243-
)
244-
def test_autoscaler_warn_deadlock(ray_start_cluster_head_with_env_vars):
245-
script = """
246-
import ray
247-
import time
248-
249-
@ray.remote(num_cpus=1)
250-
class A:
251-
pass
252-
253-
ray.init(address="{address}")
254-
255-
# Only one of a or b can be scheduled, so the other will hang.
256-
a = A.remote()
257-
b = A.remote()
258-
ray.get([a.__ray_ready__.remote(), b.__ray_ready__.remote()])
259-
""".format(
260-
address=ray_start_cluster_head_with_env_vars.address
261-
)
262-
263-
proc = run_string_as_driver_nonblocking(script, env={"PYTHONUNBUFFERED": "1"})
264-
265-
if is_autoscaler_v2():
266-
infeasible_msg = "No available node types can fulfill resource requests"
267-
else:
268-
infeasible_msg = "Warning: The following resource request cannot"
269-
270-
def _check_for_deadlock_msg():
271-
l = proc.stdout.readline().decode("ascii")
272-
if len(l) > 0:
273-
print(l)
274-
return "(autoscaler" in l and infeasible_msg in l
275-
276-
wait_for_condition(_check_for_deadlock_msg, timeout=30)
277-
278-
279223
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
280224
def test_autoscaler_v2_stream_events_with_filter(shutdown_only):
281225
"""Test that autoscaler v2 events are streamed to the driver."""

src/ray/gcs/gcs_resource_manager.cc

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,7 @@ void GcsResourceManager::ConsumeSyncMessage(
4747
io_context_.dispatch(
4848
[this, message]() {
4949
if (message->message_type() == syncer::MessageType::COMMANDS) {
50-
syncer::CommandsSyncMessage commands_sync_message;
51-
commands_sync_message.ParseFromString(message->sync_message());
52-
UpdateClusterFullOfActorsDetected(
53-
NodeID::FromBinary(message->node_id()),
54-
commands_sync_message.cluster_full_of_actors_detected());
50+
// COMMANDS channel is currently unused.
5551
} else if (message->message_type() == syncer::MessageType::RESOURCE_VIEW) {
5652
syncer::ResourceViewSyncMessage resource_view_sync_message;
5753
resource_view_sync_message.ParseFromString(message->sync_message());
@@ -206,12 +202,6 @@ void GcsResourceManager::HandleGetAllResourceUsage(
206202
cluster_lease_manager_->FillPendingActorInfo(gcs_resources_data);
207203
// Aggregate the load (pending actor info) of gcs.
208204
FillAggregateLoad(gcs_resources_data, &aggregate_load);
209-
// We only export gcs's pending info without adding the corresponding
210-
// `ResourcesData` to the `batch` list. So if gcs has detected cluster full of
211-
// actors, set the dedicated field in reply.
212-
if (gcs_resources_data.cluster_full_of_actors_detected()) {
213-
reply->set_cluster_full_of_actors_detected_by_gcs(true);
214-
}
215205
}
216206

217207
for (const auto &demand : aggregate_load) {
@@ -244,18 +234,6 @@ void GcsResourceManager::HandleGetAllResourceUsage(
244234
++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
245235
}
246236

247-
void GcsResourceManager::UpdateClusterFullOfActorsDetected(
248-
const NodeID &node_id, bool cluster_full_of_actors_detected) {
249-
auto iter = node_resource_usages_.find(node_id);
250-
if (iter == node_resource_usages_.end()) {
251-
return;
252-
}
253-
254-
// TODO(rickyx): We should change this to be part of RESOURCE_VIEW.
255-
// This is being populated from NodeManager as part of COMMANDS
256-
iter->second.set_cluster_full_of_actors_detected(cluster_full_of_actors_detected);
257-
}
258-
259237
void GcsResourceManager::UpdateNodeResourceUsage(
260238
const NodeID &node_id,
261239
const syncer::ResourceViewSyncMessage &resource_view_sync_message) {

src/ray/gcs/gcs_resource_manager.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoGcsServiceHandler,
138138
const NodeID &node_id,
139139
const syncer::ResourceViewSyncMessage &resource_view_sync_message);
140140

141-
/// Update the resource usage of a node from syncer COMMANDS
142-
///
143-
/// This is currently used for setting cluster full of actors info from syncer.
144-
/// \param data The resource report.
145-
void UpdateClusterFullOfActorsDetected(const NodeID &node_id,
146-
bool cluster_full_of_actors_detected);
147-
148141
/// Update the placement group load information so that it will be reported through
149142
/// heartbeat.
150143
///

src/ray/gcs/tests/gcs_resource_manager_test.cc

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -149,42 +149,13 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageFromDifferentSyncMsgs) {
149149
resource_view_sync_message.mutable_resources_available()->insert({"CPU", 5});
150150

151151
// Update resource usage from resource view.
152-
{
153-
ASSERT_FALSE(gcs_resource_manager_->NodeResourceReportView()
154-
.at(NodeID::FromBinary(node->node_id()))
155-
.cluster_full_of_actors_detected());
156-
gcs_resource_manager_->UpdateFromResourceView(NodeID::FromBinary(node->node_id()),
157-
resource_view_sync_message);
158-
ASSERT_EQ(
159-
cluster_resource_manager_.GetNodeResources(scheduling::NodeID(node->node_id()))
160-
.total.GetResourceMap()
161-
.at("CPU"),
162-
5);
163-
164-
ASSERT_FALSE(gcs_resource_manager_->NodeResourceReportView()
165-
.at(NodeID::FromBinary(node->node_id()))
166-
.cluster_full_of_actors_detected());
167-
}
168-
169-
// Update from syncer COMMANDS will not update the resources, but the
170-
// cluster_full_of_actors_detected flag. (This is how NodeManager currently
171-
// updates potential resources deadlock).
172-
{
173-
gcs_resource_manager_->UpdateClusterFullOfActorsDetected(
174-
NodeID::FromBinary(node->node_id()), true);
175-
176-
// Still 5 because the syncer COMMANDS message is ignored.
177-
ASSERT_EQ(
178-
cluster_resource_manager_.GetNodeResources(scheduling::NodeID(node->node_id()))
179-
.total.GetResourceMap()
180-
.at("CPU"),
181-
5);
182-
183-
// The flag is updated.
184-
ASSERT_TRUE(gcs_resource_manager_->NodeResourceReportView()
185-
.at(NodeID::FromBinary(node->node_id()))
186-
.cluster_full_of_actors_detected());
187-
}
152+
gcs_resource_manager_->UpdateFromResourceView(NodeID::FromBinary(node->node_id()),
153+
resource_view_sync_message);
154+
ASSERT_EQ(
155+
cluster_resource_manager_.GetNodeResources(scheduling::NodeID(node->node_id()))
156+
.total.GetResourceMap()
157+
.at("CPU"),
158+
5);
188159
}
189160

190161
TEST_F(GcsResourceManagerTest, TestSetAvailableResourcesWhenNodeDead) {

src/ray/protobuf/gcs.proto

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ message ResourceLoad {
506506
}
507507

508508
message ResourcesData {
509-
reserved 3, 8, 17;
509+
reserved 3, 8, 14, 17;
510510
// Node id.
511511
bytes node_id = 1;
512512
// Resource capacity currently available on this node manager.
@@ -530,8 +530,6 @@ message ResourcesData {
530530
bool resources_normal_task_changed = 12;
531531
// The timestamp that normal task resources are measured.
532532
int64 resources_normal_task_timestamp = 13;
533-
// Whether this node has detected a resource deadlock (full of actors).
534-
bool cluster_full_of_actors_detected = 14;
535533
// The duration in ms during which all the node's resources are idle. If the
536534
// node currently has any resource being used, this will be 0.
537535
int64 idle_duration_ms = 15;

src/ray/protobuf/gcs_service.proto

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -723,10 +723,6 @@ message GetAllResourceUsageRequest {}
723723
message GetAllResourceUsageReply {
724724
GcsStatus status = 1;
725725
ResourceUsageBatchData resource_usage_data = 2;
726-
/// True if gcs finds infeasible or pending actor creation tasks
727-
/// locally (when gcs actor scheduler is enabled). This field is
728-
/// expected to help triggering auto-scaling.
729-
bool cluster_full_of_actors_detected_by_gcs = 3;
730726
}
731727

732728
message GetDrainingNodesRequest {}

src/ray/protobuf/ray_syncer.proto

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ enum MessageType {
2323
message CommandsSyncMessage {
2424
// Whether this node manager is requesting global GC.
2525
bool should_global_gc = 1;
26-
// Whether this node has detected a resource deadlock (full of actors).
27-
bool cluster_full_of_actors_detected = 2;
2826
}
2927

3028
message ResourceViewSyncMessage {

0 commit comments

Comments
 (0)