Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Apr 10, 2025

closes: #44951

What?

Dags have a property fail_fast: Fails currently running tasks when task in DAG fails. Warning: A fail stop dag can only have tasks with the default trigger rule (“all_success”). An exception will be thrown if any task in a fail stop dag has a non default trigger rule.

This is a property which is kinda used in a way that is a task fails, all its upstream "running" TIs should fail if they are non teardown.

Approach

  • Since it is a dag level property, i figured thta instead of sending values like: fail_stop / fast_fail is set from the execution side, we can know it earlier itself during parse time.
  • So it makes sense to store it in the serdag but not as a new column as it may affect versioning but insterad we will store it in the "data" part which is the serialised dag in dict form
  • Now when a task is running and IT fails, during the update_state call in the api server, we will check if it is from a fail stop dag, if yes, we construct the information needed for the exisiting logic: https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/taskinstance.py#L395-L423 to mark all the downstream TIs as failed.

This information involves:

Testing

DAG:

from airflow import DAG

from datetime import datetime
import time

from airflow.providers.standard.operators.python import PythonOperator


def start_fn():
    import time
    time.sleep(5)
    assert 1 == 0

with DAG(
    dag_id="fail_fast_dag",
    schedule=None,
    tags=["fail_fast"],
    start_date=datetime(2024, 1, 1),
    catchup=False,
    fail_fast=True
) as dag:

    start = PythonOperator(
        task_id="start",
        python_callable=start_fn,
    )

    end = PythonOperator(
        task_id="end",
        python_callable=lambda: time.sleep(10),
    )

    end1 = PythonOperator(
        task_id="end1",
        python_callable=lambda: time.sleep(10),
    )

    end2 = PythonOperator(
        task_id="end2",
        python_callable=lambda: time.sleep(10),
    )

    [start, end, end1, end2]

  • Added the "fail_fast" property to serialised dags.

Before:


{"__version": 1, "dag": {"edge_info": {}, "disable_bundle_versioning": false, "start_date": 1704067200.0, "tags": ["fail_fast"], "dag_id": "fail_fast_dag", "timetable": {"__type": "airflow.timetables.simple.NullTimetable", "__var": {}}, "relative_fileloc": "dags/fail_fast.py", "timezone": "UTC", "fileloc": "/files/dags/dags/fail_fast.py", "catchup": false, "task_group": {"_group_id": null, "group_display_name": "", "prefix_group_id": true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000", "children": {"start": ["operator", "start"], "end": ["operator", "end"], "end1": ["operator", "end1"], "end2": ["operator", "end2"]}, "upstream_group_ids": [], "downstream_group_ids": [], "upstream_task_ids": [], "downstream_task_ids": []}, "_processor_dags_folder": "/files/dags", "tasks": [{"__var": {"pool": "default_pool", "template_fields": ["templates_dict", "op_args", "op_kwargs"], "ui_fgcolor": "#000", "is_setup": false, "weight_rule": "downstream", "ui_color": "#ffefeb", "template_ext": [], "_needs_expansion": false, "task_id": "start", "is_teardown": false, "start_from_trigger": false, "task_type": "PythonOperator", "downstream_task_ids": [], "template_fields_renderers": {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "on_failure_fail_dagrun": false, "python_callable_name": "unusual_prefix_da9c2c228bc9403a99c6a97b86b9947d9ec0ca62_fail_fast.start_fn", "_task_module": "airflow.providers.standard.operators.python", "_is_empty": false, "_can_skip_downstream": false, "start_trigger_args": null, "op_args": [], "op_kwargs": {}}, "__type": "operator"}, {"__var": {"pool": "default_pool", "template_fields": ["templates_dict", "op_args", "op_kwargs"], "ui_fgcolor": "#000", "is_setup": false, "weight_rule": "downstream", "ui_color": "#ffefeb", "template_ext": [], "_needs_expansion": false, "task_id": "end", "is_teardown": false, "start_from_trigger": false, "task_type": "PythonOperator", "downstream_task_ids": [], "template_fields_renderers": {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "on_failure_fail_dagrun": false, "python_callable_name": "unusual_prefix_da9c2c228bc9403a99c6a97b86b9947d9ec0ca62_fail_fast.<lambda>", "_task_module": "airflow.providers.standard.operators.python", "_is_empty": false, "_can_skip_downstream": false, "start_trigger_args": null, "op_args": [], "op_kwargs": {}}, "__type": "operator"}, {"__var": {"pool": "default_pool", "template_fields": ["templates_dict", "op_args", "op_kwargs"], "ui_fgcolor": "#000", "is_setup": false, "weight_rule": "downstream", "ui_color": "#ffefeb", "template_ext": [], "_needs_expansion": false, "task_id": "end1", "is_teardown": false, "start_from_trigger": false, "task_type": "PythonOperator", "downstream_task_ids": [], "template_fields_renderers": {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "on_failure_fail_dagrun": false, "python_callable_name": "unusual_prefix_da9c2c228bc9403a99c6a97b86b9947d9ec0ca62_fail_fast.<lambda>", "_task_module": "airflow.providers.standard.operators.python", "_is_empty": false, "_can_skip_downstream": false, "start_trigger_args": null, "op_args": [], "op_kwargs": {}}, "__type": "operator"}, {"__var": {"pool": "default_pool", "template_fields": ["templates_dict", "op_args", "op_kwargs"], "ui_fgcolor": "#000", "is_setup": false, "weight_rule": "downstream", "ui_color": "#ffefeb", "template_ext": [], "_needs_expansion": false, "task_id": "end2", "is_teardown": false, "start_from_trigger": false, "task_type": "PythonOperator", "downstream_task_ids": [], "template_fields_renderers": {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "on_failure_fail_dagrun": false, "python_callable_name": "unusual_prefix_da9c2c228bc9403a99c6a97b86b9947d9ec0ca62_fail_fast.<lambda>", "_task_module": "airflow.providers.standard.operators.python", "_is_empty": false, "_can_skip_downstream": false, "start_trigger_args": null, "op_args": [], "op_kwargs": {}}, "__type": "operator"}], "dag_dependencies": [], "params": []}}

After:

{"__version": 1, "dag": {"relative_fileloc": "dags/fail_fast.py", "disable_bundle_versioning": false, "edge_info": {}, "timetable": {"__type": "airflow.timetables.simple.NullTimetable", "__var": {}}, "tags": ["fail_fast"], "fail_fast": true, "task_group": {"_group_id": null, "group_display_name": "", "prefix_group_id": true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000", "children": {"start": ["operator", "start"], "end": ["operator", "end"], "end1": ["operator", "end1"], "end2": ["operator", "end2"]}, "upstream_group_ids": [], "downstream_group_ids": [], "upstream_task_ids": [], "downstream_task_ids": []}, "timezone": "UTC", "dag_id": "fail_fast_dag", "catchup": false, "fileloc": "/files/dags/dags/fail_fast.py", "start_date": 1704067200.0, "_processor_dags_folder": "/files/dags", "tasks": [{"__var": {"is_setup": false, "ui_fgcolor": "#000", "_needs_expansion": false, "downstream_task_ids": [], "weight_rule": "downstream", "task_type": "PythonOperator", "template_fields_renderers": {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "on_failure_fail_dagrun": false, "pool": "default_pool", "template_ext": [], "template_fields": ["templates_dict", "op_args", "op_kwargs"], "is_teardown": false, "task_id": "start", "start_from_trigger": false, "ui_color": "#ffefeb", "python_callable_name": "unusual_prefix_da9c2c228bc9403a99c6a97b86b9947d9ec0ca62_fail_fast.start_fn", "_task_module": "airflow.providers.standard.operators.python", "_is_empty": false, "_can_skip_downstream": false, "start_trigger_args": null, "op_args": [], "op_kwargs": {}}, "__type": "operator"}, {"__var": {"is_setup": false, "ui_fgcolor": "#000", "_needs_expansion": false, "downstream_task_ids": [], "weight_rule": "downstream", "task_type": "PythonOperator", "template_fields_renderers": {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "on_failure_fail_dagrun": false, "pool": "default_pool", "template_ext": [], "template_fields": ["templates_dict", "op_args", "op_kwargs"], "is_teardown": false, "task_id": "end", "start_from_trigger": false, "ui_color": "#ffefeb", "python_callable_name": "unusual_prefix_da9c2c228bc9403a99c6a97b86b9947d9ec0ca62_fail_fast.<lambda>", "_task_module": "airflow.providers.standard.operators.python", "_is_empty": false, "_can_skip_downstream": false, "start_trigger_args": null, "op_args": [], "op_kwargs": {}}, "__type": "operator"}, {"__var": {"is_setup": false, "ui_fgcolor": "#000", "_needs_expansion": false, "downstream_task_ids": [], "weight_rule": "downstream", "task_type": "PythonOperator", "template_fields_renderers": {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "on_failure_fail_dagrun": false, "pool": "default_pool", "template_ext": [], "template_fields": ["templates_dict", "op_args", "op_kwargs"], "is_teardown": false, "task_id": "end1", "start_from_trigger": false, "ui_color": "#ffefeb", "python_callable_name": "unusual_prefix_da9c2c228bc9403a99c6a97b86b9947d9ec0ca62_fail_fast.<lambda>", "_task_module": "airflow.providers.standard.operators.python", "_is_empty": false, "_can_skip_downstream": false, "start_trigger_args": null, "op_args": [], "op_kwargs": {}}, "__type": "operator"}, {"__var": {"is_setup": false, "ui_fgcolor": "#000", "_needs_expansion": false, "downstream_task_ids": [], "weight_rule": "downstream", "task_type": "PythonOperator", "template_fields_renderers": {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "on_failure_fail_dagrun": false, "pool": "default_pool", "template_ext": [], "template_fields": ["templates_dict", "op_args", "op_kwargs"], "is_teardown": false, "task_id": "end2", "start_from_trigger": false, "ui_color": "#ffefeb", "python_callable_name": "unusual_prefix_da9c2c228bc9403a99c6a97b86b9947d9ec0ca62_fail_fast.<lambda>", "_task_module": "airflow.providers.standard.operators.python", "_is_empty": false, "_can_skip_downstream": false, "start_trigger_args": null, "op_args": [], "op_kwargs": {}}, "__type": "operator"}], "dag_dependencies": [], "params": []}}

Task 1 fails cos of wrong assert
image

All other running tasks, end, end1 and end2 fail:
image

image

image

Audit log entries:
image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh
Copy link
Contributor Author

Threw in logic to update the audit log too. Here's how it looks now:
image

@amoghrajesh
Copy link
Contributor Author

Okay i just pushed another commit which imo will handle the tests now. Do we need a cadwyn migration?

(New field "Request" has been added to the api)

@amoghrajesh amoghrajesh requested a review from ashb April 10, 2025 13:46
@amoghrajesh amoghrajesh changed the title Handle 'fail_fast' dags with task sdk Handle 'fail_fast' dags with task sdk and execution API server Apr 10, 2025
@amoghrajesh
Copy link
Contributor Author

The failure isnt related to the PR, example of another one https://github.com/apache/airflow/actions/runs/14382075973/job/40328612380?pr=49059

It is being handled as we speak

@amoghrajesh amoghrajesh merged commit aaf1bee into apache:main Apr 10, 2025
83 of 84 checks passed
@amoghrajesh amoghrajesh deleted the handling-fast-fail-dags branch April 10, 2025 14:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Handle fail_stop / non teardown tasks in TASK SDK

2 participants