-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
3.0.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
When using the Kubernetes Executor with tasks that have on_failure_callback defined, if a Kubernetes pod crashes or fails, the task enters an infinite loop instead of being properly marked as failed. The scheduler attempts to execute the failure callback by generating a TaskCallbackRequest, but since the handling of TaskCallbackRequest in dag_processing.processor.py is not implemented (it raises a NotImplementedError), the task state never transitions to FAILED.
The logs show the following errors repeating:
What you think should happen instead?
In previous versions, on_failure_callback would still change the task state, but now it's temporarily unimplemented:
if isinstance(request, TaskCallbackRequest):
raise NotImplementedError(
"Haven't coded Task callback yet - https://github.com/apache/airflow/issues/44354!"
)
# _execute_task_callbacks(dagbag, request)
This results in the task state never being changed, requiring manual intervention. The proper implementation should handle the TaskCallbackRequest, execute the callback if defined, and most importantly, ensure the task state transitions to FAILED regardless of the callback's execution status.
How to reproduce
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow import DAG
from kubernetes.client import models as k8s
def failure_callback(context):
print(f'Task failed: {context['task_instance']}')
with DAG('test_failure_callback', schedule='@daily') as dag:
task = KubernetesPodOperator(
task_id='failing_task',
name='failing_task',
namespace='airflow',
image='python:3.11-slim',
cmds=['bash', '-c'],
arguments=['python -c "import time; time.sleep(300);"'],
labels={'example': 'test'},
container_resources=k8s.V1ResourceRequirements(
requests={'cpu': '500m', 'memory': '500Mi'},
limits={'cpu': '500m', 'memory': '500Mi'},
),
on_failure_callback=failure_callback,
)
After the Kubernetes executor starts, enter the pod and run Python code that causes an OOM to simulate a pod crash.
Operating System
Linux
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==9.8.0
apache-airflow-providers-celery==3.11.0
apache-airflow-providers-cncf-kubernetes==10.5.0
apache-airflow-providers-common-compat==1.7.0
apache-airflow-providers-common-io==1.6.0
apache-airflow-providers-common-messaging==1.0.3
apache-airflow-providers-common-sql==1.27.1
apache-airflow-providers-docker==4.4.0
apache-airflow-providers-elasticsearch==6.3.0
apache-airflow-providers-fab==2.2.1
apache-airflow-providers-ftp==3.13.0
apache-airflow-providers-git==0.0.2
apache-airflow-providers-google==15.1.0
apache-airflow-providers-grpc==3.8.0
apache-airflow-providers-hashicorp==4.2.0
apache-airflow-providers-http==5.3.0
apache-airflow-providers-microsoft-azure==12.4.0
apache-airflow-providers-mysql==6.3.0
apache-airflow-providers-odbc==4.10.0
apache-airflow-providers-openlineage==2.3.0
apache-airflow-providers-postgres==6.2.0
apache-airflow-providers-redis==4.1.0
apache-airflow-providers-sendgrid==4.1.0
apache-airflow-providers-sftp==5.3.0
apache-airflow-providers-slack==9.1.0
apache-airflow-providers-smtp==2.1.0
apache-airflow-providers-snowflake==6.3.1
apache-airflow-providers-ssh==4.1.0
apache-airflow-providers-standard==1.2.0
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct