Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent KubernetesJobWatcher getting stuck on resource too old #23521

Merged
merged 2 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def run(self) -> None:
time.sleep(1)
except Exception:
self.log.exception('Unknown error in KubernetesJobWatcher. Failing')
self.resource_version = "0"
ResourceVersion().resource_version = "0"
raise
else:
self.log.warning(
Expand Down Expand Up @@ -288,6 +290,7 @@ def _health_check_kube_watcher(self):
self.log.error(
'Error while health checking kube watcher process. Process died for unknown reasons'
)
ResourceVersion().resource_version = "0"
self.kube_watcher = self._make_kube_watcher()

def run_next(self, next_job: KubernetesJobType) -> None:
Expand Down
34 changes: 34 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
AirflowKubernetesScheduler,
KubernetesExecutor,
KubernetesJobWatcher,
ResourceVersion,
create_pod_id,
get_base_pod_from_template,
)
Expand Down Expand Up @@ -957,3 +958,36 @@ def test_process_error_event_for_raise_if_not_410(self):
f"Kubernetes failure for {raw_object['reason']} "
f"with code {raw_object['code']} and message: {raw_object['message']}"
)

def test_recover_from_resource_too_old(self):
# too old resource
mock_underscore_run = mock.MagicMock()

def effect():
yield '500'
while True:
yield Exception('sentinel')

mock_underscore_run.side_effect = effect()

self.watcher._run = mock_underscore_run

with mock.patch('airflow.executors.kubernetes_executor.get_kube_client'):
try:
# self.watcher._run() is mocked and return "500" as last resource_version
self.watcher.run()
except Exception as e:
assert e.args == ('sentinel',)

# both resource_version should be 0 after _run raises and exception
assert self.watcher.resource_version == '0'
assert ResourceVersion().resource_version == '0'

# check that in the next run, _run is invoked with resource_version = 0
mock_underscore_run.reset_mock()
try:
self.watcher.run()
except Exception as e:
assert e.args == ('sentinel',)

mock_underscore_run.assert_called_once_with(mock.ANY, '0', mock.ANY, mock.ANY)