Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot] Make sure the cluster status is not None when showing #1464

Merged
merged 13 commits into from
Nov 29, 2022
7 changes: 6 additions & 1 deletion sky/spot/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ def _run(self):
self._cluster_name, force_refresh=True)
if cluster_status != global_user_state.ClusterStatus.UP:
# recover the cluster if it is not up.
logger.info(f'Cluster status {cluster_status.value}. '
# The status could be None when the cluster is preempted
# right after the job was found FAILED.
cluster_status_str = ('is preempted'
if cluster_status is None else
f'status {cluster_status.value}')
logger.info(f'Cluster {cluster_status_str}. '
'Recovering...')
need_recovery = True
if not need_recovery:
Expand Down
29 changes: 25 additions & 4 deletions sky/spot/recovery_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,29 @@ class FailoverStrategyExecutor(StrategyExecutor, name='FAILOVER', default=True):

_MAX_RETRY_CNT = 240 # Retry for 4 hours.

def __init__(self, cluster_name: str, backend: 'backends.Backend',
task: 'task_lib.Task', retry_until_up: bool) -> None:
super().__init__(cluster_name, backend, task, retry_until_up)
# Note down the cloud/region of the launched cluster, so that we can
# first retry in the same cloud/region. (Inside recover() we may not
# rely on cluster handle, as it can be None if the cluster is
# preempted.)
self._launched_cloud_region = None

def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]:
launch_time = super()._launch(max_retry, raise_on_failure)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Document in _launch() a Returns: block? In particular it may return None if it failed to launch.
  2. Can we guard against None here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, spoke too soon. raise_on_failure=True means it's return-or-die. Still a Returns: block will be helpful!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Add the guard for the launch, otherwise, the assert will fail when the launch fails even if raise_on_failure is False.

handle = global_user_state.get_handle_from_cluster_name(
self.cluster_name)
assert handle is not None, 'Cluster should be launched.'
launched_resources = handle.launched_resources
self._launched_cloud_region = (launched_resources.cloud,
launched_resources.region)
return launch_time

def terminate_cluster(self, max_retry: int = 3) -> None:
super().terminate_cluster(max_retry)
self.launched_resources = None

def recover(self) -> float:
# 1. Cancel the jobs and launch the cluster with the STOPPED status,
# so that it will try on the current region first until timeout.
Expand All @@ -264,17 +287,15 @@ def recover(self) -> float:
# Retry the entire block until the cluster is up, so that the ratio of
# the time spent in the current region and the time spent in the other
# region is consistent during the retry.
handle = global_user_state.get_handle_from_cluster_name(
self.cluster_name)
while True:
# Add region constraint to the task, to retry on the same region
# first.
task = self.dag.tasks[0]
resources = list(task.resources)[0]
original_resources = resources

launched_cloud = handle.launched_resources.cloud
launched_region = handle.launched_resources.region
assert self._launched_cloud_region is not None
launched_cloud, launched_region = self._launched_cloud_region
new_resources = resources.copy(cloud=launched_cloud,
region=launched_region)
task.set_resources({new_resources})
Expand Down