From 98194d80fdebb85cd19af85f25868215d963ea1d Mon Sep 17 00:00:00 2001 From: Ruben Laguna Date: Fri, 6 May 2022 11:59:43 +0200 Subject: [PATCH 1/2] Prevent KubernetesJobWatcher getting stuck on resource too old If the watch fails because "resource too old" the KubernetesJobWatcher should not retry with the same resource version as that will end up in loop where there is no progress. --- airflow/executors/kubernetes_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 9b9de71681cf6..01dc91144ec37 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -109,6 +109,7 @@ def run(self) -> None: time.sleep(1) except Exception: self.log.exception('Unknown error in KubernetesJobWatcher. Failing') + ResourceVersion().resource_version = "0" raise else: self.log.warning( From ccf6cc2daa9b3115ee7fe3a5dc6bcabb3f02e71c Mon Sep 17 00:00:00 2001 From: Ruben Laguna Date: Sat, 7 May 2022 00:51:33 +0200 Subject: [PATCH 2/2] Reset ResourceVersion().resource_version to 0 --- airflow/executors/kubernetes_executor.py | 2 ++ tests/executors/test_kubernetes_executor.py | 34 +++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 01dc91144ec37..c76cf58f418d4 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -109,6 +109,7 @@ def run(self) -> None: time.sleep(1) except Exception: self.log.exception('Unknown error in KubernetesJobWatcher. Failing') + self.resource_version = "0" ResourceVersion().resource_version = "0" raise else: @@ -289,6 +290,7 @@ def _health_check_kube_watcher(self): self.log.error( 'Error while health checking kube watcher process. Process died for unknown reasons' ) + ResourceVersion().resource_version = "0" self.kube_watcher = self._make_kube_watcher() def run_next(self, next_job: KubernetesJobType) -> None: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index a677fe598b47f..a332a2fd6adfa 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -39,6 +39,7 @@ AirflowKubernetesScheduler, KubernetesExecutor, KubernetesJobWatcher, + ResourceVersion, create_pod_id, get_base_pod_from_template, ) @@ -957,3 +958,36 @@ def test_process_error_event_for_raise_if_not_410(self): f"Kubernetes failure for {raw_object['reason']} " f"with code {raw_object['code']} and message: {raw_object['message']}" ) + + def test_recover_from_resource_too_old(self): + # too old resource + mock_underscore_run = mock.MagicMock() + + def effect(): + yield '500' + while True: + yield Exception('sentinel') + + mock_underscore_run.side_effect = effect() + + self.watcher._run = mock_underscore_run + + with mock.patch('airflow.executors.kubernetes_executor.get_kube_client'): + try: + # self.watcher._run() is mocked and return "500" as last resource_version + self.watcher.run() + except Exception as e: + assert e.args == ('sentinel',) + + # both resource_version should be 0 after _run raises and exception + assert self.watcher.resource_version == '0' + assert ResourceVersion().resource_version == '0' + + # check that in the next run, _run is invoked with resource_version = 0 + mock_underscore_run.reset_mock() + try: + self.watcher.run() + except Exception as e: + assert e.args == ('sentinel',) + + mock_underscore_run.assert_called_once_with(mock.ANY, '0', mock.ANY, mock.ANY)