diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 2051977bf655..978f43626a2a 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -817,7 +817,6 @@ def _report_pending_infeasible(self, unfulfilled: List[ResourceDict]): """ # For type checking, assert that this object has been instantitiated. assert self.resource_demand_scheduler - pending = [] infeasible = [] for bundle in unfulfilled: placement_group = any( @@ -826,22 +825,8 @@ def _report_pending_infeasible(self, unfulfilled: List[ResourceDict]): ) if placement_group: continue - if self.resource_demand_scheduler.is_feasible(bundle): - pending.append(bundle) - else: + if not self.resource_demand_scheduler.is_feasible(bundle): infeasible.append(bundle) - if pending: - if self.load_metrics.cluster_full_of_actors_detected: - for request in pending: - self.event_summarizer.add_once_per_interval( - "Warning: The following resource request cannot be " - "scheduled right now: {}. This is likely due to all " - "cluster resources being claimed by actors. Consider " - "creating fewer actors or adding more nodes " - "to this Ray cluster.".format(request), - key="pending_{}".format(sorted(request.items())), - interval_s=30, - ) if infeasible: for request in infeasible: self.event_summarizer.add_once_per_interval( diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index ec94647bda9c..18aa4e318f04 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -78,7 +78,6 @@ def __init__(self): self.infeasible_bundles = [] self.pending_placement_groups = [] self.resource_requests = [] - self.cluster_full_of_actors_detected = False self.ray_nodes_last_used_time_by_ip = {} def __bool__(self): @@ -97,11 +96,9 @@ def update( waiting_bundles: List[Dict[str, float]] = None, infeasible_bundles: List[Dict[str, float]] = None, pending_placement_groups: List[PlacementGroupTableData] = None, - cluster_full_of_actors_detected: bool = False, ): self.static_resources_by_ip[ip] = static_resources self.node_id_by_ip[ip] = node_id - self.cluster_full_of_actors_detected = cluster_full_of_actors_detected if not waiting_bundles: waiting_bundles = [] diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index a1ac85942b85..4507a92feff1 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -268,19 +268,6 @@ def update_load_metrics(self): self.autoscaler.provider._set_nodes(new_nodes) mirror_node_types = {} - legacy_cluster_full_detected = any( - getattr(entry, "cluster_full_of_actors_detected", False) - for entry in resources_batch_data.batch - ) - cluster_full = legacy_cluster_full_detected or getattr( - response, "cluster_full_of_actors_detected_by_gcs", False - ) - if ( - hasattr(response, "cluster_full_of_actors_detected_by_gcs") - and response.cluster_full_of_actors_detected_by_gcs - ): - # GCS has detected the cluster full of actors. - cluster_full = True for resource_message in cluster_resource_state.node_states: node_id = resource_message.node_id # Generate node type config based on GCS reported node list. @@ -341,7 +328,6 @@ def update_load_metrics(self): waiting_bundles, infeasible_bundles, pending_placement_groups, - cluster_full, ) if self.readonly_config: self.readonly_config["available_node_types"].update(mirror_node_types) diff --git a/python/ray/tests/test_output.py b/python/ray/tests/test_output.py index a753040931bf..d6d4ae4f3a93 100644 --- a/python/ray/tests/test_output.py +++ b/python/ray/tests/test_output.py @@ -14,7 +14,6 @@ run_string_as_driver_nonblocking, run_string_as_driver_stdout_stderr, ) -from ray.autoscaler.v2.utils import is_autoscaler_v2 def test_dedup_logs(): @@ -221,61 +220,6 @@ def _check_for_infeasible_msg(): proc.wait() -@pytest.mark.parametrize( - "ray_start_cluster_head_with_env_vars", - [ - { - "num_cpus": 1, - "env_vars": { - "RAY_enable_autoscaler_v2": "0", - "RAY_debug_dump_period_milliseconds": "1000", - }, - }, - { - "num_cpus": 1, - "env_vars": { - "RAY_enable_autoscaler_v2": "1", - "RAY_debug_dump_period_milliseconds": "1000", - }, - }, - ], - indirect=True, -) -def test_autoscaler_warn_deadlock(ray_start_cluster_head_with_env_vars): - script = """ -import ray -import time - -@ray.remote(num_cpus=1) -class A: - pass - -ray.init(address="{address}") - -# Only one of a or b can be scheduled, so the other will hang. -a = A.remote() -b = A.remote() -ray.get([a.__ray_ready__.remote(), b.__ray_ready__.remote()]) - """.format( - address=ray_start_cluster_head_with_env_vars.address - ) - - proc = run_string_as_driver_nonblocking(script, env={"PYTHONUNBUFFERED": "1"}) - - if is_autoscaler_v2(): - infeasible_msg = "No available node types can fulfill resource requests" - else: - infeasible_msg = "Warning: The following resource request cannot" - - def _check_for_deadlock_msg(): - l = proc.stdout.readline().decode("ascii") - if len(l) > 0: - print(l) - return "(autoscaler" in l and infeasible_msg in l - - wait_for_condition(_check_for_deadlock_msg, timeout=30) - - @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_autoscaler_v2_stream_events_with_filter(shutdown_only): """Test that autoscaler v2 events are streamed to the driver.""" diff --git a/src/ray/gcs/gcs_resource_manager.cc b/src/ray/gcs/gcs_resource_manager.cc index b9303b58ee55..aa5f8decfc7c 100644 --- a/src/ray/gcs/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_resource_manager.cc @@ -47,11 +47,7 @@ void GcsResourceManager::ConsumeSyncMessage( io_context_.dispatch( [this, message]() { if (message->message_type() == syncer::MessageType::COMMANDS) { - syncer::CommandsSyncMessage commands_sync_message; - commands_sync_message.ParseFromString(message->sync_message()); - UpdateClusterFullOfActorsDetected( - NodeID::FromBinary(message->node_id()), - commands_sync_message.cluster_full_of_actors_detected()); + // COMMANDS channel is currently unused. } else if (message->message_type() == syncer::MessageType::RESOURCE_VIEW) { syncer::ResourceViewSyncMessage resource_view_sync_message; resource_view_sync_message.ParseFromString(message->sync_message()); @@ -206,12 +202,6 @@ void GcsResourceManager::HandleGetAllResourceUsage( cluster_lease_manager_->FillPendingActorInfo(gcs_resources_data); // Aggregate the load (pending actor info) of gcs. FillAggregateLoad(gcs_resources_data, &aggregate_load); - // We only export gcs's pending info without adding the corresponding - // `ResourcesData` to the `batch` list. So if gcs has detected cluster full of - // actors, set the dedicated field in reply. - if (gcs_resources_data.cluster_full_of_actors_detected()) { - reply->set_cluster_full_of_actors_detected_by_gcs(true); - } } for (const auto &demand : aggregate_load) { @@ -244,18 +234,6 @@ void GcsResourceManager::HandleGetAllResourceUsage( ++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST]; } -void GcsResourceManager::UpdateClusterFullOfActorsDetected( - const NodeID &node_id, bool cluster_full_of_actors_detected) { - auto iter = node_resource_usages_.find(node_id); - if (iter == node_resource_usages_.end()) { - return; - } - - // TODO(rickyx): We should change this to be part of RESOURCE_VIEW. - // This is being populated from NodeManager as part of COMMANDS - iter->second.set_cluster_full_of_actors_detected(cluster_full_of_actors_detected); -} - void GcsResourceManager::UpdateNodeResourceUsage( const NodeID &node_id, const syncer::ResourceViewSyncMessage &resource_view_sync_message) { diff --git a/src/ray/gcs/gcs_resource_manager.h b/src/ray/gcs/gcs_resource_manager.h index 178bc91e1d59..7ff87766d04d 100644 --- a/src/ray/gcs/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_resource_manager.h @@ -138,13 +138,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoGcsServiceHandler, const NodeID &node_id, const syncer::ResourceViewSyncMessage &resource_view_sync_message); - /// Update the resource usage of a node from syncer COMMANDS - /// - /// This is currently used for setting cluster full of actors info from syncer. - /// \param data The resource report. - void UpdateClusterFullOfActorsDetected(const NodeID &node_id, - bool cluster_full_of_actors_detected); - /// Update the placement group load information so that it will be reported through /// heartbeat. /// diff --git a/src/ray/gcs/tests/gcs_resource_manager_test.cc b/src/ray/gcs/tests/gcs_resource_manager_test.cc index a9732014aadd..f67530006b42 100644 --- a/src/ray/gcs/tests/gcs_resource_manager_test.cc +++ b/src/ray/gcs/tests/gcs_resource_manager_test.cc @@ -149,42 +149,13 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageFromDifferentSyncMsgs) { resource_view_sync_message.mutable_resources_available()->insert({"CPU", 5}); // Update resource usage from resource view. - { - ASSERT_FALSE(gcs_resource_manager_->NodeResourceReportView() - .at(NodeID::FromBinary(node->node_id())) - .cluster_full_of_actors_detected()); - gcs_resource_manager_->UpdateFromResourceView(NodeID::FromBinary(node->node_id()), - resource_view_sync_message); - ASSERT_EQ( - cluster_resource_manager_.GetNodeResources(scheduling::NodeID(node->node_id())) - .total.GetResourceMap() - .at("CPU"), - 5); - - ASSERT_FALSE(gcs_resource_manager_->NodeResourceReportView() - .at(NodeID::FromBinary(node->node_id())) - .cluster_full_of_actors_detected()); - } - - // Update from syncer COMMANDS will not update the resources, but the - // cluster_full_of_actors_detected flag. (This is how NodeManager currently - // updates potential resources deadlock). - { - gcs_resource_manager_->UpdateClusterFullOfActorsDetected( - NodeID::FromBinary(node->node_id()), true); - - // Still 5 because the syncer COMMANDS message is ignored. - ASSERT_EQ( - cluster_resource_manager_.GetNodeResources(scheduling::NodeID(node->node_id())) - .total.GetResourceMap() - .at("CPU"), - 5); - - // The flag is updated. - ASSERT_TRUE(gcs_resource_manager_->NodeResourceReportView() - .at(NodeID::FromBinary(node->node_id())) - .cluster_full_of_actors_detected()); - } + gcs_resource_manager_->UpdateFromResourceView(NodeID::FromBinary(node->node_id()), + resource_view_sync_message); + ASSERT_EQ( + cluster_resource_manager_.GetNodeResources(scheduling::NodeID(node->node_id())) + .total.GetResourceMap() + .at("CPU"), + 5); } TEST_F(GcsResourceManagerTest, TestSetAvailableResourcesWhenNodeDead) { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 351bd67c6a97..dec417f36fee 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -506,7 +506,7 @@ message ResourceLoad { } message ResourcesData { - reserved 3, 8, 17; + reserved 3, 8, 14, 17; // Node id. bytes node_id = 1; // Resource capacity currently available on this node manager. @@ -530,8 +530,6 @@ message ResourcesData { bool resources_normal_task_changed = 12; // The timestamp that normal task resources are measured. int64 resources_normal_task_timestamp = 13; - // Whether this node has detected a resource deadlock (full of actors). - bool cluster_full_of_actors_detected = 14; // The duration in ms during which all the node's resources are idle. If the // node currently has any resource being used, this will be 0. int64 idle_duration_ms = 15; diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 4567578e8083..82df3b003d80 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -723,10 +723,6 @@ message GetAllResourceUsageRequest {} message GetAllResourceUsageReply { GcsStatus status = 1; ResourceUsageBatchData resource_usage_data = 2; - /// True if gcs finds infeasible or pending actor creation tasks - /// locally (when gcs actor scheduler is enabled). This field is - /// expected to help triggering auto-scaling. - bool cluster_full_of_actors_detected_by_gcs = 3; } message GetDrainingNodesRequest {} diff --git a/src/ray/protobuf/ray_syncer.proto b/src/ray/protobuf/ray_syncer.proto index e424b38cbf77..6ec15782b83a 100644 --- a/src/ray/protobuf/ray_syncer.proto +++ b/src/ray/protobuf/ray_syncer.proto @@ -23,8 +23,6 @@ enum MessageType { message CommandsSyncMessage { // Whether this node manager is requesting global GC. bool should_global_gc = 1; - // Whether this node has detected a resource deadlock (full of actors). - bool cluster_full_of_actors_detected = 2; } message ResourceViewSyncMessage { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 6eff80a18bde..043a4711ef0c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3022,8 +3022,6 @@ std::optional NodeManager::CreateSyncMessage( std::string serialized_commands_sync_msg; syncer::CommandsSyncMessage commands_sync_message; commands_sync_message.set_should_global_gc(true); - commands_sync_message.set_cluster_full_of_actors_detected(resource_deadlock_warned_ >= - 1); RAY_CHECK(commands_sync_message.SerializeToString(&serialized_commands_sync_msg)); // Populate the sync message. diff --git a/src/ray/raylet/scheduling/scheduler_resource_reporter.cc b/src/ray/raylet/scheduling/scheduler_resource_reporter.cc index ab5113d3ceac..b3ef78a658ba 100644 --- a/src/ray/raylet/scheduling/scheduler_resource_reporter.cc +++ b/src/ray/raylet/scheduling/scheduler_resource_reporter.cc @@ -180,7 +180,6 @@ void SchedulerResourceReporter::FillPendingActorCountByShape( } if (!pending_count_by_shape.empty()) { - data.set_cluster_full_of_actors_detected(true); auto resource_load_by_shape = data.mutable_resource_load_by_shape()->mutable_resource_demands(); for (const auto &shape_entry : pending_count_by_shape) {