-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Task should fail immediately when pod is unprocessable #19359
Conversation
When pod has invalid requirements, e.g. resource limit < resource request, the kubernetes api may return "Unprocessable Entity". In this scenario, the kubernetes executor should fail the task immediately, rather than set it to be attempted again
Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
kubernetes_executor.execute_async( | ||
key=('dag', 'task', 'run_id', try_number), | ||
key=task_instance_key, | ||
queue=None, | ||
command=['airflow', 'tasks', 'run', 'true', 'some_parameter'], | ||
) | ||
kubernetes_executor.sync() | ||
kubernetes_executor.sync() | ||
|
||
assert mock_kube_client.create_namespaced_pod.called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert mock_kube_client.create_namespaced_pod.called | |
mock_kube_client.create_namespaced_pod.assert_called_once() |
or
assert mock_kube_client.create_namespaced_pod.called | |
assert mock_kube_client.create_namespaced_pod.call_count == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it's odd...
the test actually calls sync 2 times prior to this line
so depending on the scenario call count could be 2.
but i will remove one of the sync calls and assert == 1... it seems that must have been a mistake.
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') | ||
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client') | ||
def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher): | ||
def test_run_next_exception_requeue( | ||
self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reason
isn't used, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it provides a human-friendly test name. this is a pattern i've seen in other airflow tests, though perhaps more commonly the param would be called name
. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok updated
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
When pod has invalid requirements, e.g. resource limit < resource request,
the kubernetes api may return "Unprocessable Entity". In this scenario,
the kubernetes executor should fail the task immediately, rather than set
it to be attempted again
closes #19320