Skip to content

Commit

Permalink
[Spot] Make sure the cluster status is not None when showing (#1464)
Browse files Browse the repository at this point in the history
* Make sure the cluster status is not None when showing

* Fix another potential issue with NoneType of handle

* Add assert

* fix

* format

* Address comments

* Address comments

* format

* format

* fix

* fix

* fix spot cancellation

* format
  • Loading branch information
Michaelvll authored and concretevitamin committed Dec 1, 2022
1 parent 27ac6bd commit f379c70
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 20 deletions.
7 changes: 6 additions & 1 deletion sky/spot/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ def _run(self):
self._cluster_name, force_refresh=True)
if cluster_status != global_user_state.ClusterStatus.UP:
# recover the cluster if it is not up.
logger.info(f'Cluster status {cluster_status.value}. '
# The status could be None when the cluster is preempted
# right after the job was found FAILED.
cluster_status_str = ('is preempted'
if cluster_status is None else
f'status {cluster_status.value}')
logger.info(f'Cluster {cluster_status_str}. '
'Recovering...')
need_recovery = True
if not need_recovery:
Expand Down
63 changes: 45 additions & 18 deletions sky/spot/recovery_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]:
Args:
max_retry: The maximum number of retries. If None, retry forever.
raise_on_failure: Whether to raise an exception if the launch fails.
Returns:
The job's start timestamp, or None if failed to start and
raise_on_failure is False.
"""
# TODO(zhwu): handle the failure during `preparing sky runtime`.
retry_cnt = 0
Expand Down Expand Up @@ -251,6 +255,31 @@ class FailoverStrategyExecutor(StrategyExecutor, name='FAILOVER', default=True):

_MAX_RETRY_CNT = 240 # Retry for 4 hours.

def __init__(self, cluster_name: str, backend: 'backends.Backend',
task: 'task_lib.Task', retry_until_up: bool) -> None:
super().__init__(cluster_name, backend, task, retry_until_up)
# Note down the cloud/region of the launched cluster, so that we can
# first retry in the same cloud/region. (Inside recover() we may not
# rely on cluster handle, as it can be None if the cluster is
# preempted.)
self._launched_cloud_region = None

def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]:
launch_time = super()._launch(max_retry, raise_on_failure)
if launch_time is not None:
# Only record the cloud/region if the launch is successful.
handle = global_user_state.get_handle_from_cluster_name(
self.cluster_name)
assert handle is not None, 'Cluster should be launched.'
launched_resources = handle.launched_resources
self._launched_cloud_region = (launched_resources.cloud,
launched_resources.region)
return launch_time

def terminate_cluster(self, max_retry: int = 3) -> None:
super().terminate_cluster(max_retry)
self._launched_cloud_region = None

def recover(self) -> float:
# 1. Cancel the jobs and launch the cluster with the STOPPED status,
# so that it will try on the current region first until timeout.
Expand All @@ -264,26 +293,24 @@ def recover(self) -> float:
# Retry the entire block until the cluster is up, so that the ratio of
# the time spent in the current region and the time spent in the other
# region is consistent during the retry.
handle = global_user_state.get_handle_from_cluster_name(
self.cluster_name)
while True:
# Add region constraint to the task, to retry on the same region
# first.
task = self.dag.tasks[0]
resources = list(task.resources)[0]
original_resources = resources

launched_cloud = handle.launched_resources.cloud
launched_region = handle.launched_resources.region
new_resources = resources.copy(cloud=launched_cloud,
region=launched_region)
task.set_resources({new_resources})
# Not using self.launch to avoid the retry until up logic.
launched_time = self._launch(raise_on_failure=False)
# Restore the original dag, i.e. reset the region constraint.
task.set_resources({original_resources})
if launched_time is not None:
return launched_time
# first (if valid).
if self._launched_cloud_region is not None:
task = self.dag.tasks[0]
resources = list(task.resources)[0]
original_resources = resources

launched_cloud, launched_region = self._launched_cloud_region
new_resources = resources.copy(cloud=launched_cloud,
region=launched_region)
task.set_resources({new_resources})
# Not using self.launch to avoid the retry until up logic.
launched_time = self._launch(raise_on_failure=False)
# Restore the original dag, i.e. reset the region constraint.
task.set_resources({original_resources})
if launched_time is not None:
return launched_time

# Step 2
logger.debug('Terminating unhealthy spot cluster.')
Expand Down
4 changes: 3 additions & 1 deletion tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,10 +937,12 @@ def test_spot_cancellation():
'sleep 10',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name}-3 | head -n1 | grep "CANCELLED"',
'sleep 90',
# The cluster should be terminated (shutting-down) after cancellation. We don't use the `=` operator here because
# there can be multiple VM with the same name due to the recovery.
(f's=$(aws ec2 describe-instances --region {region} '
f'--filters Name=tag:ray-cluster-name,Values={name}-3* '
f'--query Reservations[].Instances[].State[].Name '
'--output text) && printf "$s" && echo; [[ -z "$s" ]] || [[ "$s" = "terminated" ]] || [[ "$s" = "shutting-down" ]]'
'--output text) && printf "$s" && echo; [[ -z "$s" ]] || echo "$s" | grep -v -E "pending|running|stopped|stopping"'
),
])
run_one_test(test)
Expand Down

0 comments on commit f379c70

Please sign in to comment.