Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task should fail immediately when pod is unprocessable #19359

Merged
merged 10 commits into from
Nov 4, 2021
10 changes: 7 additions & 3 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,13 +599,17 @@ def sync(self) -> None:
try:
self.kube_scheduler.run_next(task)
except ApiException as e:
if e.reason == "BadRequest":
self.log.error("Request was invalid. Failing task")

# These codes indicate something is wrong with pod definition; otherwise we assume pod
# definition is ok, and that retrying may work
if e.status in (400, 422):
self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
key, _, _, _ = task
self.change_state(key, State.FAILED, e)
else:
self.log.warning(
'ApiException when attempting to run task, re-queueing. Message: %s',
'ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s',
e.reason,
json.loads(e.body)['message'],
)
self.task_queue.put(task)
Expand Down
75 changes: 51 additions & 24 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())


class TestKubernetesExecutor(unittest.TestCase):
class TestKubernetesExecutor:
"""
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""

def setUp(self) -> None:
def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = "5"

@unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
)
@pytest.mark.parametrize(
'status, should_requeue',
[
pytest.param(403, True, id='403 Forbidden'),
pytest.param(12345, True, id='12345 fake-unhandled-reason'),
pytest.param(422, False, id='422 Unprocessable Entity'),
pytest.param(400, False, id='400 BadRequest'),
],
)
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
def test_run_next_exception_requeue(
self, mock_get_kube_client, mock_kubernetes_job_watcher, status, should_requeue
):
"""
When pod scheduling fails with either reason 'Forbidden', or any reason not yet
handled in the relevant try-except block, the task should stay in the ``task_queue``
and be attempted on a subsequent executor sync. When reason is 'Unprocessable Entity'
or 'BadRequest', the task should be failed without being re-queued.

Note on error scenarios:

- 403 Forbidden will be returned when your request exceeds namespace quota.
- 422 Unprocessable Entity is returned when your parameters are valid but unsupported
e.g. limits lower than requests.
- 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.

"""
import sys

path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'

# When a quota is exceeded this is the ApiException we get
response = HTTPResponse(
body='{"kind": "Status", "apiVersion": "v1", "metadata": {}, "status": "Failure", '
'"message": "pods \\"podname\\" is forbidden: exceeded quota: compute-resources, '
'requested: limits.memory=4Gi, used: limits.memory=6508Mi, limited: limits.memory=10Gi", '
'"reason": "Forbidden", "details": {"name": "podname", "kind": "pods"}, "code": 403}'
)
response.status = 403
response.reason = "Forbidden"
response = HTTPResponse(body='{"message": "any message"}', status=status)

# A mock kube_client that throws errors when making a pod
mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
Expand All @@ -225,24 +244,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
kubernetes_executor.start()
# Execute a task while the Api Throws errors
try_number = 1
task_instance_key = ('dag', 'task', 'run_id', try_number)
kubernetes_executor.execute_async(
key=('dag', 'task', 'run_id', try_number),
key=task_instance_key,
queue=None,
command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
)
kubernetes_executor.sync()
kubernetes_executor.sync()

assert mock_kube_client.create_namespaced_pod.called
assert not kubernetes_executor.task_queue.empty()
assert mock_kube_client.create_namespaced_pod.call_count == 1

# Disable the ApiException
mock_kube_client.create_namespaced_pod.side_effect = None
if should_requeue:
assert not kubernetes_executor.task_queue.empty()

# Execute the task without errors should empty the queue
kubernetes_executor.sync()
assert mock_kube_client.create_namespaced_pod.called
assert kubernetes_executor.task_queue.empty()
# Disable the ApiException
mock_kube_client.create_namespaced_pod.side_effect = None

# Execute the task without errors should empty the queue
mock_kube_client.create_namespaced_pod.reset_mock()
kubernetes_executor.sync()
assert mock_kube_client.create_namespaced_pod.called
assert kubernetes_executor.task_queue.empty()
else:
assert kubernetes_executor.task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED

@mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
Expand Down Expand Up @@ -279,7 +304,9 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job
assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"

@pytest.mark.execution_timeout(10)
@unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
)
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.run_pod_async')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):
Expand Down