Skip to content

Commit

Permalink
[core][autoscaler] Fix incorrectly terminating nodes misclassified as…
Browse files Browse the repository at this point in the history
… idle in autoscaler v1 (ray-project#48519)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

In autoscaler v1, nodes are incorrectly classified as idle based solely
on their resource usage metrics. This misclassification can occur under
the following conditions:
1. Tasks running on the node do not have assigned resources.
2. All tasks on the node are blocked on get or wait operations.

This will lead to the incorrect termination of nodes during downscaling.

To resolve this issue, use the `idle_duration_ms` reported by raylet
instead, which already considers the aforementioned conditions. ref:
ray-project#39582

### Before: NodeDiedError

![image](https://github.com/user-attachments/assets/a126af98-7950-40c4-ad43-2448f4b0d71a)
### After

![image](https://github.com/user-attachments/assets/ae5f6c74-6b7a-4684-a126-66e9a562149c)

### Reproduction Script (on local fake nodes)
- Setting:
  head_nodes: < 10 cpus, worker nodes: 10 cpus
- Code:
  ```
  import ray
  import time
  import os
  import random

  @ray.remote(max_retries=5, num_cpus=10)
  def inside_ray_task_with_outside():
      print('start inside_ray_task_with_outside')
      sleep_time = 15
      start_time = time.perf_counter()
      while True:
          if(time.perf_counter() - start_time < sleep_time):
              time.sleep(0.001)
          else:
              break

  @ray.remote(max_retries=5, num_cpus=10)
  def inside_ray_task_without_outside():
      print('start inside_ray_task_without_outside task')
      sleep_time = 50
      start_time = time.perf_counter()
      while True:
          if(time.perf_counter() - start_time < sleep_time):
              time.sleep(0.001)
          else:
              break

  @ray.remote(max_retries=0, num_cpus=10)
  def outside_ray_task():
      print('start outside_ray_task task')
      future_list = [inside_ray_task_with_outside.remote(),
                          inside_ray_task_without_outside.remote()]
      ray.get(future_list)

  if __name__ == '__main__':
      ray.init()
      ray.get(outside_ray_task.remote())
  ```

## Related issue number

<!-- For example: "Closes ray-project#1234" -->
Closes ray-project#46492
## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Mimi Liao <mimiliao2000@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
mimiliaogo authored and ujjawal-khare committed Dec 17, 2024
1 parent 30acfe9 commit cfbca6a
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 115 deletions.
10 changes: 7 additions & 3 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,11 @@ def terminate_nodes_to_enforce_config_constraints(self, now: float):
assert self.non_terminated_nodes
assert self.provider

last_used = self.load_metrics.last_used_time_by_ip
horizon = now - (60 * self.config["idle_timeout_minutes"])
last_used = self.load_metrics.ray_nodes_last_used_time_by_ip

idle_timeout_s = 60 * self.config["idle_timeout_minutes"]

last_used_cutoff = now - idle_timeout_s

# Sort based on last used to make sure to keep min_workers that
# were most recently used. Otherwise, _keep_min_workers_of_node_type
Expand Down Expand Up @@ -539,7 +542,8 @@ def keep_node(node_id: NodeID) -> None:
continue

node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon:

if node_ip in last_used and last_used[node_ip] < last_used_cutoff:
self.schedule_node_termination(node_id, "idle", logger.info)
# Get the local time of the node's last use as a string.
formatted_last_used_time = time.asctime(
Expand Down
13 changes: 5 additions & 8 deletions python/ray/autoscaler/_private/load_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class LoadMetrics:
"""

def __init__(self):
self.last_used_time_by_ip = {}
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
Expand All @@ -80,6 +79,7 @@ def __init__(self):
self.pending_placement_groups = []
self.resource_requests = []
self.cluster_full_of_actors_detected = False
self.ray_nodes_last_used_time_by_ip = {}

def __bool__(self):
"""A load metrics instance is Falsey iff the autoscaler process
Expand All @@ -93,6 +93,7 @@ def update(
raylet_id: bytes,
static_resources: Dict[str, Dict],
dynamic_resources: Dict[str, Dict],
node_idle_duration_s: float,
waiting_bundles: List[Dict[str, float]] = None,
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None,
Expand Down Expand Up @@ -120,11 +121,7 @@ def update(
self.dynamic_resources_by_ip[ip] = dynamic_resources_update

now = time.time()
if (
ip not in self.last_used_time_by_ip
or self.static_resources_by_ip[ip] != self.dynamic_resources_by_ip[ip]
):
self.last_used_time_by_ip[ip] = now
self.ray_nodes_last_used_time_by_ip[ip] = now - node_idle_duration_s
self.last_heartbeat_time_by_ip[ip] = now
self.waiting_bundles = waiting_bundles
self.infeasible_bundles = infeasible_bundles
Expand Down Expand Up @@ -167,7 +164,7 @@ def prune(mapping, should_log):
)
assert not (unwanted_ips & set(mapping))

prune(self.last_used_time_by_ip, should_log=True)
prune(self.ray_nodes_last_used_time_by_ip, should_log=True)
prune(self.static_resources_by_ip, should_log=False)
prune(self.raylet_id_by_ip, should_log=False)
prune(self.dynamic_resources_by_ip, should_log=False)
Expand Down Expand Up @@ -337,7 +334,7 @@ def _info(self):
resources_used, resources_total = self._get_resource_usage()

now = time.time()
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
idle_times = [now - t for t in self.ray_nodes_last_used_time_by_ip.values()]
heartbeat_times = [now - t for t in self.last_heartbeat_time_by_ip.values()]
most_delayed_heartbeats = sorted(
self.last_heartbeat_time_by_ip.items(), key=lambda pair: pair[1]
Expand Down
20 changes: 20 additions & 0 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.prom_metrics import AutoscalerPrometheusMetrics
from ray.autoscaler._private.util import format_readonly_node_type
from ray.autoscaler.v2.sdk import get_cluster_resource_state
from ray.core.generated import gcs_pb2
from ray.core.generated.event_pb2 import Event as RayEvent
from ray.experimental.internal_kv import (
Expand Down Expand Up @@ -245,6 +246,15 @@ def update_load_metrics(self):
resources_batch_data = response.resource_usage_data
log_resource_batch_data_if_desired(resources_batch_data)

# This is a workaround to get correct idle_duration_ms
# from "get_cluster_resource_state"
# ref: https://github.com/ray-project/ray/pull/48519#issuecomment-2481659346
cluster_resource_state = get_cluster_resource_state(self.gcs_client)
ray_node_states = cluster_resource_state.node_states
ray_nodes_idle_duration_ms_by_id = {
node.node_id: node.idle_duration_ms for node in ray_node_states
}

# Tell the readonly node provider what nodes to report.
if self.readonly_config:
new_nodes = []
Expand Down Expand Up @@ -309,11 +319,21 @@ def update_load_metrics(self):
ip = node_id.hex()
else:
ip = resource_message.node_manager_address

idle_duration_s = 0.0
if node_id in ray_nodes_idle_duration_ms_by_id:
idle_duration_s = ray_nodes_idle_duration_ms_by_id[node_id] / 1000
else:
logger.warning(
f"node_id {node_id} not found in ray_nodes_idle_duration_ms_by_id"
)

self.load_metrics.update(
ip,
node_id,
total_resources,
available_resources,
idle_duration_s,
waiting_bundles,
infeasible_bundles,
pending_placement_groups,
Expand Down
Loading

0 comments on commit cfbca6a

Please sign in to comment.