From 66adadbba48ac4d0927aa3380a99f0c63216ec07 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 17:40:23 -0800 Subject: [PATCH 01/13] Make sure the cluster status is not None when showing --- sky/spot/controller.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sky/spot/controller.py b/sky/spot/controller.py index 16c2459603b..27640bbfe6a 100644 --- a/sky/spot/controller.py +++ b/sky/spot/controller.py @@ -96,7 +96,12 @@ def _run(self): self._cluster_name, force_refresh=True) if cluster_status != global_user_state.ClusterStatus.UP: # recover the cluster if it is not up. - logger.info(f'Cluster status {cluster_status.value}. ' + # The status could be None when the cluster is preempted + # right after the job was found FAILED. + cluster_status_str = ('is preempted' + if cluster_status is None else + f'status {cluster_status.value}') + logger.info(f'Cluster {cluster_status_str}. ' 'Recovering...') need_recovery = True if not need_recovery: From eb1cf31f21c9b151b79fd2cefe7cdc7bc2df25ad Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 20:15:46 -0800 Subject: [PATCH 02/13] Fix another potential issue with NoneType of handle --- sky/spot/recovery_strategy.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 17d870ecfcc..e5c640802b3 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -251,6 +251,25 @@ class FailoverStrategyExecutor(StrategyExecutor, name='FAILOVER', default=True): _MAX_RETRY_CNT = 240 # Retry for 4 hours. + def __init__(self, cluster_name: str, backend: 'backends.Backend', + task: 'task_lib.Task', retry_until_up: bool) -> None: + super().__init__(cluster_name, backend, task, retry_until_up) + # Note down the cloud/region of the launched cluster, so that we can + # first retry in the same cloud/region. (we cannot use handle, as it + # can be None if the cluster is preempted) + self.launched_cloud_region = None + + def launch(self) -> Optional[float]: + launch_time = super().launch() + handle = global_user_state.get_cluster_from_name(self.cluster_name) + launched_resources = handle.launched_resources + self.launched_cloud_region = (launched_resources.cloud, launched_resources.region) + return launch_time + + def terminate_cluster(self, max_retry: int = 3) -> None: + super().terminate_cluster(max_retry) + self.launched_resources = None + def recover(self) -> float: # 1. Cancel the jobs and launch the cluster with the STOPPED status, # so that it will try on the current region first until timeout. @@ -264,8 +283,6 @@ def recover(self) -> float: # Retry the entire block until the cluster is up, so that the ratio of # the time spent in the current region and the time spent in the other # region is consistent during the retry. - handle = global_user_state.get_handle_from_cluster_name( - self.cluster_name) while True: # Add region constraint to the task, to retry on the same region # first. @@ -273,8 +290,8 @@ def recover(self) -> float: resources = list(task.resources)[0] original_resources = resources - launched_cloud = handle.launched_resources.cloud - launched_region = handle.launched_resources.region + assert self.launched_cloud_region is not None + launched_cloud, launched_region = self.launched_cloud_region new_resources = resources.copy(cloud=launched_cloud, region=launched_region) task.set_resources({new_resources}) From 87084b279cbed55efaf723f3a5cedbecba3ac56a Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 20:20:15 -0800 Subject: [PATCH 03/13] Add assert --- sky/spot/recovery_strategy.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index e5c640802b3..0c6c4f2eb7a 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -258,14 +258,16 @@ def __init__(self, cluster_name: str, backend: 'backends.Backend', # first retry in the same cloud/region. (we cannot use handle, as it # can be None if the cluster is preempted) self.launched_cloud_region = None - + def launch(self) -> Optional[float]: launch_time = super().launch() handle = global_user_state.get_cluster_from_name(self.cluster_name) + assert handle is not None, 'Cluster should be launched.' launched_resources = handle.launched_resources - self.launched_cloud_region = (launched_resources.cloud, launched_resources.region) + self.launched_cloud_region = (launched_resources.cloud, + launched_resources.region) return launch_time - + def terminate_cluster(self, max_retry: int = 3) -> None: super().terminate_cluster(max_retry) self.launched_resources = None From 06efb3ec5fc693840afc38723f4c157361330994 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 20:27:28 -0800 Subject: [PATCH 04/13] fix --- sky/spot/recovery_strategy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 0c6c4f2eb7a..cda263943b5 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -261,7 +261,7 @@ def __init__(self, cluster_name: str, backend: 'backends.Backend', def launch(self) -> Optional[float]: launch_time = super().launch() - handle = global_user_state.get_cluster_from_name(self.cluster_name) + handle = global_user_state.get_handle_from_cluster_name(self.cluster_name) assert handle is not None, 'Cluster should be launched.' launched_resources = handle.launched_resources self.launched_cloud_region = (launched_resources.cloud, From 27c7b3586ab9b704745d1794a444fe49b837990a Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 20:32:49 -0800 Subject: [PATCH 05/13] format --- sky/spot/recovery_strategy.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index cda263943b5..790524055f1 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -261,7 +261,8 @@ def __init__(self, cluster_name: str, backend: 'backends.Backend', def launch(self) -> Optional[float]: launch_time = super().launch() - handle = global_user_state.get_handle_from_cluster_name(self.cluster_name) + handle = global_user_state.get_handle_from_cluster_name( + self.cluster_name) assert handle is not None, 'Cluster should be launched.' launched_resources = handle.launched_resources self.launched_cloud_region = (launched_resources.cloud, From 543a30d14e4bf0cb679fee3e41473589afdba2d6 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 21:21:52 -0800 Subject: [PATCH 06/13] Address comments --- sky/spot/recovery_strategy.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 790524055f1..d731c06b819 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -255,18 +255,19 @@ def __init__(self, cluster_name: str, backend: 'backends.Backend', task: 'task_lib.Task', retry_until_up: bool) -> None: super().__init__(cluster_name, backend, task, retry_until_up) # Note down the cloud/region of the launched cluster, so that we can - # first retry in the same cloud/region. (we cannot use handle, as it - # can be None if the cluster is preempted) - self.launched_cloud_region = None + # first retry in the same cloud/region. (Inside recover() we may not + # rely on cluster handle, as it can be None if the cluster is + # preempted.) + self._launched_cloud_region = None - def launch(self) -> Optional[float]: - launch_time = super().launch() + def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]: + launch_time = super()._launch(max_retry, raise_on_failure) handle = global_user_state.get_handle_from_cluster_name( self.cluster_name) assert handle is not None, 'Cluster should be launched.' launched_resources = handle.launched_resources - self.launched_cloud_region = (launched_resources.cloud, - launched_resources.region) + self._launched_cloud_region = (launched_resources.cloud, + launched_resources.region) return launch_time def terminate_cluster(self, max_retry: int = 3) -> None: @@ -293,8 +294,8 @@ def recover(self) -> float: resources = list(task.resources)[0] original_resources = resources - assert self.launched_cloud_region is not None - launched_cloud, launched_region = self.launched_cloud_region + assert self._launched_cloud_region is not None + launched_cloud, launched_region = self._launched_cloud_region new_resources = resources.copy(cloud=launched_cloud, region=launched_region) task.set_resources({new_resources}) From 70edfaf7e638f5d249c69ed36c78cd4a861fa44c Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 22:09:46 -0800 Subject: [PATCH 07/13] Address comments --- sky/spot/recovery_strategy.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index d731c06b819..7bb566ccbe5 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -136,6 +136,10 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]: Args: max_retry: The maximum number of retries. If None, retry forever. raise_on_failure: Whether to raise an exception if the launch fails. + + Returns: + The job's start timestamp, or None if failed to start and + raise_on_failure is False. """ # TODO(zhwu): handle the failure during `preparing sky runtime`. retry_cnt = 0 @@ -262,12 +266,14 @@ def __init__(self, cluster_name: str, backend: 'backends.Backend', def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]: launch_time = super()._launch(max_retry, raise_on_failure) - handle = global_user_state.get_handle_from_cluster_name( - self.cluster_name) - assert handle is not None, 'Cluster should be launched.' - launched_resources = handle.launched_resources - self._launched_cloud_region = (launched_resources.cloud, - launched_resources.region) + if launch_time is not None: + # Only record the cloud/region if the launch is successful. + handle = global_user_state.get_handle_from_cluster_name( + self.cluster_name) + assert handle is not None, 'Cluster should be launched.' + launched_resources = handle.launched_resources + self._launched_cloud_region = (launched_resources.cloud, + launched_resources.region) return launch_time def terminate_cluster(self, max_retry: int = 3) -> None: From a734293b8eddbe43c48f26669251de8bd86721b5 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 22:13:01 -0800 Subject: [PATCH 08/13] format --- sky/spot/recovery_strategy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 7bb566ccbe5..9a957f1018b 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -273,7 +273,7 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]: assert handle is not None, 'Cluster should be launched.' launched_resources = handle.launched_resources self._launched_cloud_region = (launched_resources.cloud, - launched_resources.region) + launched_resources.region) return launch_time def terminate_cluster(self, max_retry: int = 3) -> None: From 500ac1b74edfa294a8a8ba433a79c5a5028b80de Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 22:13:23 -0800 Subject: [PATCH 09/13] format --- sky/spot/recovery_strategy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 9a957f1018b..ecf3e53e918 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -136,8 +136,8 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]: Args: max_retry: The maximum number of retries. If None, retry forever. raise_on_failure: Whether to raise an exception if the launch fails. - - Returns: + + Returns: The job's start timestamp, or None if failed to start and raise_on_failure is False. """ From e0b5bd6736c6417374ae3a830f14d77fd274a190 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 22:18:21 -0800 Subject: [PATCH 10/13] fix --- sky/spot/recovery_strategy.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index ecf3e53e918..7fd71038c84 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -295,22 +295,22 @@ def recover(self) -> float: # region is consistent during the retry. while True: # Add region constraint to the task, to retry on the same region - # first. - task = self.dag.tasks[0] - resources = list(task.resources)[0] - original_resources = resources - - assert self._launched_cloud_region is not None - launched_cloud, launched_region = self._launched_cloud_region - new_resources = resources.copy(cloud=launched_cloud, - region=launched_region) - task.set_resources({new_resources}) - # Not using self.launch to avoid the retry until up logic. - launched_time = self._launch(raise_on_failure=False) - # Restore the original dag, i.e. reset the region constraint. - task.set_resources({original_resources}) - if launched_time is not None: - return launched_time + # first (if valid). + if self._launched_cloud_region is not None: + task = self.dag.tasks[0] + resources = list(task.resources)[0] + original_resources = resources + + launched_cloud, launched_region = self._launched_cloud_region + new_resources = resources.copy(cloud=launched_cloud, + region=launched_region) + task.set_resources({new_resources}) + # Not using self.launch to avoid the retry until up logic. + launched_time = self._launch(raise_on_failure=False) + # Restore the original dag, i.e. reset the region constraint. + task.set_resources({original_resources}) + if launched_time is not None: + return launched_time # Step 2 logger.debug('Terminating unhealthy spot cluster.') From e01ab9acb081b50707bd95fb115edb6ab668256a Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 28 Nov 2022 22:29:23 -0800 Subject: [PATCH 11/13] fix --- sky/spot/recovery_strategy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/spot/recovery_strategy.py b/sky/spot/recovery_strategy.py index 7fd71038c84..3459cb2edf7 100644 --- a/sky/spot/recovery_strategy.py +++ b/sky/spot/recovery_strategy.py @@ -278,7 +278,7 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]: def terminate_cluster(self, max_retry: int = 3) -> None: super().terminate_cluster(max_retry) - self.launched_resources = None + self._launched_cloud_region = None def recover(self) -> float: # 1. Cancel the jobs and launch the cluster with the STOPPED status, From 347f0bd8853bfacf31b290c43d5a14acf22658fb Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 29 Nov 2022 00:15:27 -0800 Subject: [PATCH 12/13] fix spot cancellation --- tests/test_smoke.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 4fc0bb0b82a..85c93e17ac4 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -903,11 +903,12 @@ def test_spot_cancellation(): 'sleep 10', f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name}-3 | head -n1 | grep "CANCELLED"', 'sleep 90', + # The cluster should be terminated (shutting-down) after cancellation. We don't use the `=` operator here because + # there can be multiple VM with the same name due to the recovery. (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name}-3* ' f'--query Reservations[].Instances[].State[].Name ' - '--output text) && printf "$s" && echo; [[ -z "$s" ]] || [[ "$s" = "terminated" ]] || [[ "$s" = "shutting-down" ]]' - ), + '--output text) && printf "$s" && echo; [[ -z "$s" ]] || echo "$s" | grep -v -E "pending|running|stopped|stopping"'), ]) run_one_test(test) From 0234becf01d1427605fd675c170f78241b5c88fb Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 29 Nov 2022 00:16:13 -0800 Subject: [PATCH 13/13] format --- tests/test_smoke.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 85c93e17ac4..c81d0c05ce6 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -908,7 +908,8 @@ def test_spot_cancellation(): (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name}-3* ' f'--query Reservations[].Instances[].State[].Name ' - '--output text) && printf "$s" && echo; [[ -z "$s" ]] || echo "$s" | grep -v -E "pending|running|stopped|stopping"'), + '--output text) && printf "$s" && echo; [[ -z "$s" ]] || echo "$s" | grep -v -E "pending|running|stopped|stopping"' + ), ]) run_one_test(test)