Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,6 @@ def _report_pending_infeasible(self, unfulfilled: List[ResourceDict]):
"""
# For type checking, assert that this object has been instantitiated.
assert self.resource_demand_scheduler
pending = []
infeasible = []
for bundle in unfulfilled:
placement_group = any(
Expand All @@ -826,22 +825,8 @@ def _report_pending_infeasible(self, unfulfilled: List[ResourceDict]):
)
if placement_group:
continue
if self.resource_demand_scheduler.is_feasible(bundle):
pending.append(bundle)
else:
if not self.resource_demand_scheduler.is_feasible(bundle):
infeasible.append(bundle)
if pending:
if self.load_metrics.cluster_full_of_actors_detected:
for request in pending:
self.event_summarizer.add_once_per_interval(
"Warning: The following resource request cannot be "
"scheduled right now: {}. This is likely due to all "
"cluster resources being claimed by actors. Consider "
"creating fewer actors or adding more nodes "
"to this Ray cluster.".format(request),
key="pending_{}".format(sorted(request.items())),
interval_s=30,
)
if infeasible:
for request in infeasible:
self.event_summarizer.add_once_per_interval(
Expand Down
3 changes: 0 additions & 3 deletions python/ray/autoscaler/_private/load_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def __init__(self):
self.infeasible_bundles = []
self.pending_placement_groups = []
self.resource_requests = []
self.cluster_full_of_actors_detected = False
self.ray_nodes_last_used_time_by_ip = {}

def __bool__(self):
Expand All @@ -97,11 +96,9 @@ def update(
waiting_bundles: List[Dict[str, float]] = None,
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None,
cluster_full_of_actors_detected: bool = False,
):
self.static_resources_by_ip[ip] = static_resources
self.node_id_by_ip[ip] = node_id
self.cluster_full_of_actors_detected = cluster_full_of_actors_detected

if not waiting_bundles:
waiting_bundles = []
Expand Down
14 changes: 0 additions & 14 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,6 @@ def update_load_metrics(self):
self.autoscaler.provider._set_nodes(new_nodes)

mirror_node_types = {}
legacy_cluster_full_detected = any(
getattr(entry, "cluster_full_of_actors_detected", False)
for entry in resources_batch_data.batch
)
cluster_full = legacy_cluster_full_detected or getattr(
response, "cluster_full_of_actors_detected_by_gcs", False
)
if (
hasattr(response, "cluster_full_of_actors_detected_by_gcs")
and response.cluster_full_of_actors_detected_by_gcs
):
# GCS has detected the cluster full of actors.
cluster_full = True
for resource_message in cluster_resource_state.node_states:
node_id = resource_message.node_id
# Generate node type config based on GCS reported node list.
Expand Down Expand Up @@ -341,7 +328,6 @@ def update_load_metrics(self):
waiting_bundles,
infeasible_bundles,
pending_placement_groups,
cluster_full,
)
if self.readonly_config:
self.readonly_config["available_node_types"].update(mirror_node_types)
Expand Down
56 changes: 0 additions & 56 deletions python/ray/tests/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
run_string_as_driver_nonblocking,
run_string_as_driver_stdout_stderr,
)
from ray.autoscaler.v2.utils import is_autoscaler_v2


def test_dedup_logs():
Expand Down Expand Up @@ -221,61 +220,6 @@ def _check_for_infeasible_msg():
proc.wait()


@pytest.mark.parametrize(
"ray_start_cluster_head_with_env_vars",
[
{
"num_cpus": 1,
"env_vars": {
"RAY_enable_autoscaler_v2": "0",
"RAY_debug_dump_period_milliseconds": "1000",
},
},
{
"num_cpus": 1,
"env_vars": {
"RAY_enable_autoscaler_v2": "1",
"RAY_debug_dump_period_milliseconds": "1000",
},
},
],
indirect=True,
)
def test_autoscaler_warn_deadlock(ray_start_cluster_head_with_env_vars):
script = """
import ray
import time

@ray.remote(num_cpus=1)
class A:
pass

ray.init(address="{address}")

# Only one of a or b can be scheduled, so the other will hang.
a = A.remote()
b = A.remote()
ray.get([a.__ray_ready__.remote(), b.__ray_ready__.remote()])
""".format(
address=ray_start_cluster_head_with_env_vars.address
)

proc = run_string_as_driver_nonblocking(script, env={"PYTHONUNBUFFERED": "1"})

if is_autoscaler_v2():
infeasible_msg = "No available node types can fulfill resource requests"
else:
infeasible_msg = "Warning: The following resource request cannot"

def _check_for_deadlock_msg():
l = proc.stdout.readline().decode("ascii")
if len(l) > 0:
print(l)
return "(autoscaler" in l and infeasible_msg in l

wait_for_condition(_check_for_deadlock_msg, timeout=30)


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_autoscaler_v2_stream_events_with_filter(shutdown_only):
"""Test that autoscaler v2 events are streamed to the driver."""
Expand Down
24 changes: 1 addition & 23 deletions src/ray/gcs/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ void GcsResourceManager::ConsumeSyncMessage(
io_context_.dispatch(
[this, message]() {
if (message->message_type() == syncer::MessageType::COMMANDS) {
syncer::CommandsSyncMessage commands_sync_message;
commands_sync_message.ParseFromString(message->sync_message());
UpdateClusterFullOfActorsDetected(
NodeID::FromBinary(message->node_id()),
commands_sync_message.cluster_full_of_actors_detected());
// COMMANDS channel is currently unused.
} else if (message->message_type() == syncer::MessageType::RESOURCE_VIEW) {
syncer::ResourceViewSyncMessage resource_view_sync_message;
resource_view_sync_message.ParseFromString(message->sync_message());
Expand Down Expand Up @@ -206,12 +202,6 @@ void GcsResourceManager::HandleGetAllResourceUsage(
cluster_lease_manager_->FillPendingActorInfo(gcs_resources_data);
// Aggregate the load (pending actor info) of gcs.
FillAggregateLoad(gcs_resources_data, &aggregate_load);
// We only export gcs's pending info without adding the corresponding
// `ResourcesData` to the `batch` list. So if gcs has detected cluster full of
// actors, set the dedicated field in reply.
if (gcs_resources_data.cluster_full_of_actors_detected()) {
reply->set_cluster_full_of_actors_detected_by_gcs(true);
}
}

for (const auto &demand : aggregate_load) {
Expand Down Expand Up @@ -244,18 +234,6 @@ void GcsResourceManager::HandleGetAllResourceUsage(
++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
}

void GcsResourceManager::UpdateClusterFullOfActorsDetected(
const NodeID &node_id, bool cluster_full_of_actors_detected) {
auto iter = node_resource_usages_.find(node_id);
if (iter == node_resource_usages_.end()) {
return;
}

// TODO(rickyx): We should change this to be part of RESOURCE_VIEW.
// This is being populated from NodeManager as part of COMMANDS
iter->second.set_cluster_full_of_actors_detected(cluster_full_of_actors_detected);
}

void GcsResourceManager::UpdateNodeResourceUsage(
const NodeID &node_id,
const syncer::ResourceViewSyncMessage &resource_view_sync_message) {
Expand Down
7 changes: 0 additions & 7 deletions src/ray/gcs/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoGcsServiceHandler,
const NodeID &node_id,
const syncer::ResourceViewSyncMessage &resource_view_sync_message);

/// Update the resource usage of a node from syncer COMMANDS
///
/// This is currently used for setting cluster full of actors info from syncer.
/// \param data The resource report.
void UpdateClusterFullOfActorsDetected(const NodeID &node_id,
bool cluster_full_of_actors_detected);

/// Update the placement group load information so that it will be reported through
/// heartbeat.
///
Expand Down
43 changes: 7 additions & 36 deletions src/ray/gcs/tests/gcs_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,42 +149,13 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageFromDifferentSyncMsgs) {
resource_view_sync_message.mutable_resources_available()->insert({"CPU", 5});

// Update resource usage from resource view.
{
ASSERT_FALSE(gcs_resource_manager_->NodeResourceReportView()
.at(NodeID::FromBinary(node->node_id()))
.cluster_full_of_actors_detected());
gcs_resource_manager_->UpdateFromResourceView(NodeID::FromBinary(node->node_id()),
resource_view_sync_message);
ASSERT_EQ(
cluster_resource_manager_.GetNodeResources(scheduling::NodeID(node->node_id()))
.total.GetResourceMap()
.at("CPU"),
5);

ASSERT_FALSE(gcs_resource_manager_->NodeResourceReportView()
.at(NodeID::FromBinary(node->node_id()))
.cluster_full_of_actors_detected());
}

// Update from syncer COMMANDS will not update the resources, but the
// cluster_full_of_actors_detected flag. (This is how NodeManager currently
// updates potential resources deadlock).
{
gcs_resource_manager_->UpdateClusterFullOfActorsDetected(
NodeID::FromBinary(node->node_id()), true);

// Still 5 because the syncer COMMANDS message is ignored.
ASSERT_EQ(
cluster_resource_manager_.GetNodeResources(scheduling::NodeID(node->node_id()))
.total.GetResourceMap()
.at("CPU"),
5);

// The flag is updated.
ASSERT_TRUE(gcs_resource_manager_->NodeResourceReportView()
.at(NodeID::FromBinary(node->node_id()))
.cluster_full_of_actors_detected());
}
gcs_resource_manager_->UpdateFromResourceView(NodeID::FromBinary(node->node_id()),
resource_view_sync_message);
ASSERT_EQ(
cluster_resource_manager_.GetNodeResources(scheduling::NodeID(node->node_id()))
.total.GetResourceMap()
.at("CPU"),
5);
}

TEST_F(GcsResourceManagerTest, TestSetAvailableResourcesWhenNodeDead) {
Expand Down
4 changes: 1 addition & 3 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ message ResourceLoad {
}

message ResourcesData {
reserved 3, 8, 17;
reserved 3, 8, 14, 17;
// Node id.
bytes node_id = 1;
// Resource capacity currently available on this node manager.
Expand All @@ -530,8 +530,6 @@ message ResourcesData {
bool resources_normal_task_changed = 12;
// The timestamp that normal task resources are measured.
int64 resources_normal_task_timestamp = 13;
// Whether this node has detected a resource deadlock (full of actors).
bool cluster_full_of_actors_detected = 14;
// The duration in ms during which all the node's resources are idle. If the
// node currently has any resource being used, this will be 0.
int64 idle_duration_ms = 15;
Expand Down
4 changes: 0 additions & 4 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,6 @@ message GetAllResourceUsageRequest {}
message GetAllResourceUsageReply {
GcsStatus status = 1;
ResourceUsageBatchData resource_usage_data = 2;
/// True if gcs finds infeasible or pending actor creation tasks
/// locally (when gcs actor scheduler is enabled). This field is
/// expected to help triggering auto-scaling.
bool cluster_full_of_actors_detected_by_gcs = 3;
}

message GetDrainingNodesRequest {}
Expand Down
2 changes: 0 additions & 2 deletions src/ray/protobuf/ray_syncer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ enum MessageType {
message CommandsSyncMessage {
// Whether this node manager is requesting global GC.
bool should_global_gc = 1;
// Whether this node has detected a resource deadlock (full of actors).
bool cluster_full_of_actors_detected = 2;
}

message ResourceViewSyncMessage {
Expand Down
2 changes: 0 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3022,8 +3022,6 @@ std::optional<syncer::RaySyncMessage> NodeManager::CreateSyncMessage(
std::string serialized_commands_sync_msg;
syncer::CommandsSyncMessage commands_sync_message;
commands_sync_message.set_should_global_gc(true);
commands_sync_message.set_cluster_full_of_actors_detected(resource_deadlock_warned_ >=
1);
RAY_CHECK(commands_sync_message.SerializeToString(&serialized_commands_sync_msg));

// Populate the sync message.
Expand Down
1 change: 0 additions & 1 deletion src/ray/raylet/scheduling/scheduler_resource_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ void SchedulerResourceReporter::FillPendingActorCountByShape(
}

if (!pending_count_by_shape.empty()) {
data.set_cluster_full_of_actors_detected(true);
auto resource_load_by_shape =
data.mutable_resource_load_by_shape()->mutable_resource_demands();
for (const auto &shape_entry : pending_count_by_shape) {
Expand Down